Merged
Conversation
kamilkisiela
approved these changes
Feb 12, 2026
dotansimha
reviewed
Feb 16, 2026
dotansimha
reviewed
Feb 16, 2026
dotansimha
reviewed
Feb 16, 2026
dotansimha
reviewed
Feb 16, 2026
dotansimha
approved these changes
Feb 16, 2026
dotansimha
reviewed
Feb 16, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Ref ROUTER-244
This is a stacked PR based on #620.
Adding WebSocket support for GraphQL subscriptions, enabling clients to connect to the router, and the router to communicate with subgraphs using the
graphql-transport-wsprotocol in addition to the HTTP-based streaming protocols (SSE, Multipart, Incremental Delivery) from #620.WebSockets is a implemented as a full transport and as such can also execute queries, mutations and subscriptions.
Configuration
Subscriptions
Added subscription protocol configuration to allow per-subgraph protocol selection:
To use the defaults (same path but with
wsschema) for a subgraph (or all subgraphs), you do:WebSocket
Added configuration for the router's WebSocket server (client → router connections):
source
connectionorbothaccepts in theConnectionInitmessage payload. When enabled, all fields in the payload are treated as headers. For example, a connection init message:{ "type": "connection_init", "payload": { "Authorization": "Bearer abc123" } }is the same as if a HTTP request is made with
Authorization: Bearer abc123.source
operationorbothaccepts headers in theheadersfield of GraphQL operation extansions. For example, a GraphQL operation:{ "type": "subscribe", "id": "1", "payload": { "query": "subscription { greetings }", "extensions": { "headers": { "Authorization": "Bearer abc123" } } } }is the same as if a HTTP request is made with
Authorization: Bearer abc123.source
bothand persisttruemerges headers from both sources and stores them as the headers of the active connection. Useful for authentication with expiring tokens - initial connection uses one token, subsequent operation can provide an updated token and then that token would be used for all operations even if they dont haveextensions.headers.*Operation extensions take precedence in case of conflicts.
Essentially, this is how the headers logic works:
Header Propagation
Continues to work with WebSockets so that
ConnectionInit.payloadandextensions.headersare seen as "headers":Router → Subgraph
ConnectionInitmessage payload when establishing the WebSocket connection to subgraphsClient → Router
ConnectionInitmessage payload (enabled by default)extensions.headersfield of eachSubscribemessage (disabled by default)Executor Refactoring
The
SubgraphExecutorMapwas refactored to support different executor types based on operation:HTTP executors and subscription executors are now stored separately (
http_executors_by_subgraphandsubscription_executors_by_subgraph).Executor factories are split in two:
get_or_create_http_executor()- for queries/mutations, and also for subscriptions when using HTTP streaming protocols (SSE, Multipart, etc.)get_or_create_subscription_executor()- for subscriptions, routes to either HTTP or WebSocket executor based on configWhy
http_executorsand notsingle_result_executors?The HTTP executor supports both single-result operations AND streaming subscriptions via HTTP protocols (SSE, Multipart, Incremental Delivery). The naming reflects the transport protocol, not the operation type. The same
HTTPSubgraphExecutorcan stream subscription results over HTTP just like it handles queries/mutations.Why separate maps?
The same subgraph might need both an HTTP executor (for queries/mutations) and a WebSocket executor (for subscriptions) at the same endpoint. This refactoring ensures they're managed independently while still sharing endpoint resolution logic.
Note that a single subgraph should never have multiple subscription executors.
WebSocket Implementation
lib/executor/src/executors/graphql_transport_ws.rs
Protocol types implementing the graphql-transport-ws spec:
lib/executor/src/executors/websocket_common.rs
Shared WebSocket infrastructure for both client and server ensuring a stable connection:
WsState- tracks connection state (handshake, heartbeat, subscriptions)lib/executor/src/executors/websocket_client.rs
WebSocket client for connecting to subgraph subscriptions:
lib/executor/src/executors/ws.rs
WsSubgraphExecutorimplementation:Arbiter(dedicated OS thread) for WebSocket operationsbin/router/src/pipeline/websocket_server.rs
Handles incoming WebSocket connections from clients:
graphql-transport-wsprotocol for client-to-router connectionswebsocket_common.rsandgraphql_transport_ws.rsWhy the
sink.io().stop_timer()?ntex-rs/ntex#756
Technical Considerations
Why is ntex updated to v3 even though it's v3 on main branch?
Not rebased. I am basically testing for the upcoming rebase. Will rebase after all of the executor pipeline refactoring calm down.
Why ntex and Arbiters?
The WebSocket client uses ntex, which is
Rc-based rather thanArc-based. This design choice is intentional in ntex - it optimizes for single-threaded async performance by avoiding atomic reference counting overhead. The tradeoff is that ntex types are notSend/Sync.To bridge ntex's single-threaded model with Tokio's multi-threaded runtime, each
WsSubgraphExecutorspawns its ownArbiter.Why not use a
Send/Sync-compatible WebSocket library liketokio-tungstenite?The entire router is built on ntex. Mixing async runtimes (ntex + tokio-tungstenite) adds complexity and potential for subtle bugs at runtime boundaries. Keeping everything in ntex means consistent behavior and easier debugging.
Furthermore, the
websocket_common.rsmodule is shared between the WebSocket client (for subgraph connections) and the WebSocket server (for client connections). Using ntex for both means this shared code works seamlessly in both contexts and can be tested together.ntex's
Rc-based approach eliminates atomic operations on every clone/drop. For high-throughput WebSocket scenarios with many message dispatches, this can matter.What about the Arbiter overhead?
An OS thread is not a CPU core. Modern systems can handle thousands of OS threads with minimal overhead when they're mostly idle (waiting on I/O). The Arbiter threads spend most of their time suspended, waiting for WebSocket frames. The actual CPU cost is minimal because:
The alternative - making everything
Arc-based or using locks - would add overhead on every operation, not just at thread boundaries. For a WebSocket connection that might process thousands of messages, avoiding atomic operations on each message dispatch is worthwhile.Why not pool Arbiters across executors?
We could implement a thread pool for outgoing WebSocket executors to share Arbiters, but there's no real need for it currently. A single OS thread can handle a very large number of concurrent WebSocket connections because:
In practice, you'd hit the OS file descriptor limits long before a single Arbiter becomes a bottleneck. Thread pooling would add complexity without measurable benefit for typical workloads.
Why custom deserializer for server and client messages?
sonic_rs has issues with serde's internally-tagged enum deserialization (
#[serde(tag = "type")]). The graphql-transport-ws protocol uses atypefield to discriminate message types:{"type": "connection_init", "payload": {...}} {"type": "subscribe", "id": "1", "payload": {...}} {"type": "next", "id": "1", "payload": {...}}This maps naturally to Rust enums with
#[serde(tag = "type")]:However, sonic_rs fails to deserialize these internally-tagged enums correctly. The deserialization fails outright.
Alternatively we could use
serde_jsonand drop the custom serializer, but then everywhere else we'd have to convert sonic_rs<->serde_json.Connection Limits
Client → Router (incoming WebSocket connections)
Actual numbers
I've ran stress tests that continuously establishes WebSocket connections (also does the subprotocol handshake) to the router and I kept adding the connections until the next one gets rejected.
First I hit the file descriptor limit, I increased that and then I hit the OS ephemeral port exhaustion, then I increased the ephermal port range and managed to get ~32k active WS connections before caping. I think the cap is ntex or rust or OS threads - but one server being able to accept ~32k connections is very good already.
Memory behaviour
It seems like there is a memory leak, but after instrumenting with xcode's instruments - no leak was detected whatsover - no new allocations were made either.
The pattern is as follows: at the start of the stresstest, the memory jumps from ~40MB to about ~130MB and then steadily grows by ~1MB over time as the stresstest continues - very slowely, but it grows. Do note that it does not shrink even after closing WS connections.
If instruments were to be trusted, there is no leak. It could be things like ntex growing its buffer to handle the influx of traffic and not shrinking it expecting more - but I am not really sure. This needs more testing.
Router → Subgraph (outgoing WebSocket connections)
No limit.
Whats up with
testkit_v2.rsContains the improved testing kit for testing the router, see usage in e2e/src/websocket.rs allowing the router to run in parallel and hence have e2es run in parallel.
At the moment is missing allocating random ports to subgraphs!
TODOs
ws::startntex-rs/ntex#765*Also inherits those from #620.