Skip to content

feat: WebSockets#738

Merged
enisdenjo merged 97 commits intonot-kamil-subsfrom
not-kamil-subs-ws
Feb 16, 2026
Merged

feat: WebSockets#738
enisdenjo merged 97 commits intonot-kamil-subsfrom
not-kamil-subs-ws

Conversation

@enisdenjo
Copy link
Member

@enisdenjo enisdenjo commented Feb 3, 2026

Ref ROUTER-244

This is a stacked PR based on #620.

Adding WebSocket support for GraphQL subscriptions, enabling clients to connect to the router, and the router to communicate with subgraphs using the graphql-transport-ws protocol in addition to the HTTP-based streaming protocols (SSE, Multipart, Incremental Delivery) from #620.

WebSockets is a implemented as a full transport and as such can also execute queries, mutations and subscriptions.

Configuration

Subscriptions

Added subscription protocol configuration to allow per-subgraph protocol selection:

subscriptions:
  enabled: true # http subscriptions are enabled always when subscriptions are enabled
  websocket:
    all:
      path: /graphql # Defaults to {http.path}
    subgraphs:
      reviews:
        path: /ws # Override specific subgraph path

To use the defaults (same path but with ws schema) for a subgraph (or all subgraphs), you do:

subscriptions:
  enabled: true # http subscriptions are enabled always when subscriptions are enabled
  websocket:
    all: {}
    subgraphs:
      reviews: {}

WebSocket

Added configuration for the router's WebSocket server (client → router connections):

websocket:
  enabled: true
  path: /ws # Optional, defaults to {http.graphql_endpoint}
  headers:
    source: none | connection | operation | both # default connection
    persist: true # Default: true
  • source connection or both accepts in the ConnectionInit message payload. When enabled, all fields in the payload are treated as headers. For example, a connection init message:

    {
      "type": "connection_init",
      "payload": {
        "Authorization": "Bearer abc123"
      }
    }

    is the same as if a HTTP request is made with Authorization: Bearer abc123.

  • source operation or both accepts headers in the headers field of GraphQL operation extansions. For example, a GraphQL operation:

    {
      "type": "subscribe",
      "id": "1",
      "payload": {
        "query": "subscription { greetings }",
        "extensions": {
          "headers": {
            "Authorization": "Bearer abc123"
          }
        }
      }
    }

    is the same as if a HTTP request is made with Authorization: Bearer abc123.

  • source both and persist true merges headers from both sources and stores them as the headers of the active connection. Useful for authentication with expiring tokens - initial connection uses one token, subsequent operation can provide an updated token and then that token would be used for all operations even if they dont have extensions.headers.

*Operation extensions take precedence in case of conflicts.

Essentially, this is how the headers logic works:

WebSocketServer.onOpen((socket) => {
  const state = { headers: {} };

  socket.onInitMessage((message) => {
    if (source === "connection" || source === "both") {
      state.headers = message.payload;
    }
  });

  socket.onOperation((operation) => {
    // this operation's headers, start with the state's headers
    let headers = state.headers;

    if (source === "operation") {
      // if the source is operation, only use the operation's headers
      headers = operation.extensions.headers;
    }

    if (source === "both") {
      // if the source is both, merge the headers

      headers = {
        ...state.headers, // state headers are from the connection init payload
        ...operation.extensions.headers,
      };

      if (persist) {
        // save the resulting headers as the state's headers
        state.headers = headers;
      }
    }

    // ...
  });
});

Header Propagation

Continues to work with WebSockets so that ConnectionInit.payload and extensions.headers are seen as "headers":

Router → Subgraph

  • All configured header and JWT propagation rules are respected
  • Headers are sent inside the ConnectionInit message payload when establishing the WebSocket connection to subgraphs

Client → Router

  • Clients can send headers in two ways (controlled by configuration):
    1. Connection-level: Via the ConnectionInit message payload (enabled by default)
    2. Operation-level: Via the extensions.headers field of each Subscribe message (disabled by default)
  • Those values are seen as headers and would behave as if you have sent an HTTP request with them.

Executor Refactoring

The SubgraphExecutorMap was refactored to support different executor types based on operation:

HTTP executors and subscription executors are now stored separately (http_executors_by_subgraph and subscription_executors_by_subgraph).

Executor factories are split in two:

  • get_or_create_http_executor() - for queries/mutations, and also for subscriptions when using HTTP streaming protocols (SSE, Multipart, etc.)
  • get_or_create_subscription_executor() - for subscriptions, routes to either HTTP or WebSocket executor based on config

Why http_executors and not single_result_executors?

The HTTP executor supports both single-result operations AND streaming subscriptions via HTTP protocols (SSE, Multipart, Incremental Delivery). The naming reflects the transport protocol, not the operation type. The same HTTPSubgraphExecutor can stream subscription results over HTTP just like it handles queries/mutations.

Why separate maps?

The same subgraph might need both an HTTP executor (for queries/mutations) and a WebSocket executor (for subscriptions) at the same endpoint. This refactoring ensures they're managed independently while still sharing endpoint resolution logic.

Note that a single subgraph should never have multiple subscription executors.

WebSocket Implementation

lib/executor/src/executors/graphql_transport_ws.rs

Protocol types implementing the graphql-transport-ws spec:

  • Client/server message types
  • Connection initialization
  • Custom close codes for the protocol

lib/executor/src/executors/websocket_common.rs

Shared WebSocket infrastructure for both client and server ensuring a stable connection:

  • WsState - tracks connection state (handshake, heartbeat, subscriptions)
  • Heartbeat mechanism
  • Handshake timeout enforcement
  • Frame parsing utilities

lib/executor/src/executors/websocket_client.rs

WebSocket client for connecting to subgraph subscriptions:

  • Implements graphql-transport-ws protocol
  • Multiplexes multiple subscriptions over a single connection
  • Background task for message dispatching
  • Automatic cleanup with subscription guards
  • Not Send/Sync - uses ntex's Rc-based types, designed for single-threaded use

lib/executor/src/executors/ws.rs

WsSubgraphExecutor implementation:

  • Each executor spawns its own Arbiter (dedicated OS thread) for WebSocket operations
    • Note that this is per executor, not per connection! A single dedicated OS thread will handle all connections for a given subgraph executor.
  • Handles both single-query execution and streaming subscriptions
    • But is ultimately used only for subscriptions.
  • The WebSocket executor automatically converts HTTP URIs to WebSocket URIs (http→ws, https→wss) and applies the configured path if provided.

bin/router/src/pipeline/websocket_server.rs

Handles incoming WebSocket connections from clients:

  • Implements graphql-transport-ws protocol for client-to-router connections
  • Connection lifecycle management (init, ack, heartbeat, close)
    • All come from websocket_common.rs and graphql_transport_ws.rs
  • Subscription multiplexing over a single WebSocket connection
  • Integration with existing pipeline (parsing, validation, execution)
  • Proper error handling and close codes per spec

Why the sink.io().stop_timer()?

ntex-rs/ntex#756

Technical Considerations

Why is ntex updated to v3 even though it's v3 on main branch?

Not rebased. I am basically testing for the upcoming rebase. Will rebase after all of the executor pipeline refactoring calm down.

Why ntex and Arbiters?

The WebSocket client uses ntex, which is Rc-based rather than Arc-based. This design choice is intentional in ntex - it optimizes for single-threaded async performance by avoiding atomic reference counting overhead. The tradeoff is that ntex types are not Send/Sync.

To bridge ntex's single-threaded model with Tokio's multi-threaded runtime, each WsSubgraphExecutor spawns its own Arbiter.

Why not use a Send/Sync-compatible WebSocket library like tokio-tungstenite?

The entire router is built on ntex. Mixing async runtimes (ntex + tokio-tungstenite) adds complexity and potential for subtle bugs at runtime boundaries. Keeping everything in ntex means consistent behavior and easier debugging.

Furthermore, the websocket_common.rs module is shared between the WebSocket client (for subgraph connections) and the WebSocket server (for client connections). Using ntex for both means this shared code works seamlessly in both contexts and can be tested together.

ntex's Rc-based approach eliminates atomic operations on every clone/drop. For high-throughput WebSocket scenarios with many message dispatches, this can matter.

What about the Arbiter overhead?

An OS thread is not a CPU core. Modern systems can handle thousands of OS threads with minimal overhead when they're mostly idle (waiting on I/O). The Arbiter threads spend most of their time suspended, waiting for WebSocket frames. The actual CPU cost is minimal because:

  • Thread creation is a one-time cost per subgraph executor (not per connection!)
  • Idle threads consume negligible CPU - they're parked by the OS
  • Context switching only happens when there's actual work (incoming frames)

The alternative - making everything Arc-based or using locks - would add overhead on every operation, not just at thread boundaries. For a WebSocket connection that might process thousands of messages, avoiding atomic operations on each message dispatch is worthwhile.

Why not pool Arbiters across executors?

We could implement a thread pool for outgoing WebSocket executors to share Arbiters, but there's no real need for it currently. A single OS thread can handle a very large number of concurrent WebSocket connections because:

  • WebSocket connections are mostly idle, waiting for frames
  • The async runtime (ntex) multiplexes all connections on the same thread using epoll/kqueue
  • Actual CPU work per message is minimal (parsing JSON, dispatching to channels)
  • The bottleneck is network I/O, not thread count

In practice, you'd hit the OS file descriptor limits long before a single Arbiter becomes a bottleneck. Thread pooling would add complexity without measurable benefit for typical workloads.

Why custom deserializer for server and client messages?

sonic_rs has issues with serde's internally-tagged enum deserialization (#[serde(tag = "type")]). The graphql-transport-ws protocol uses a type field to discriminate message types:

{"type": "connection_init", "payload": {...}}
{"type": "subscribe", "id": "1", "payload": {...}}
{"type": "next", "id": "1", "payload": {...}}

This maps naturally to Rust enums with #[serde(tag = "type")]:

#[derive(Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClientMessage {
    ConnectionInit { payload: Option<ConnectionInitPayload> },
    Subscribe { id: String, payload: SubscribePayload },
    // ...
}

However, sonic_rs fails to deserialize these internally-tagged enums correctly. The deserialization fails outright.

Alternatively we could use serde_json and drop the custom serializer, but then everywhere else we'd have to convert sonic_rs<->serde_json.

Connection Limits

Client → Router (incoming WebSocket connections)

  • Currently no explicit limit on the number of concurrent WebSocket connections from clients to the router
  • Each WebSocket connection is handled by ntex's async runtime without spawning dedicated threads
  • Practical limits are bounded by OS file descriptor limits and memory

Actual numbers

I've ran stress tests that continuously establishes WebSocket connections (also does the subprotocol handshake) to the router and I kept adding the connections until the next one gets rejected.

First I hit the file descriptor limit, I increased that and then I hit the OS ephemeral port exhaustion, then I increased the ephermal port range and managed to get ~32k active WS connections before caping. I think the cap is ntex or rust or OS threads - but one server being able to accept ~32k connections is very good already.

Memory behaviour

It seems like there is a memory leak, but after instrumenting with xcode's instruments - no leak was detected whatsover - no new allocations were made either.

The pattern is as follows: at the start of the stresstest, the memory jumps from ~40MB to about ~130MB and then steadily grows by ~1MB over time as the stresstest continues - very slowely, but it grows. Do note that it does not shrink even after closing WS connections.

If instruments were to be trusted, there is no leak. It could be things like ntex growing its buffer to handle the influx of traffic and not shrinking it expecting more - but I am not really sure. This needs more testing.

Router → Subgraph (outgoing WebSocket connections)

No limit.

Whats up with testkit_v2.rs

Contains the improved testing kit for testing the router, see usage in e2e/src/websocket.rs allowing the router to run in parallel and hence have e2es run in parallel.

At the moment is missing allocating random ports to subgraphs!

TODOs

  • Currently the websocket client is tested against the server (and therefore vice versa). Add tests specific to the server/client.
  • Benchmarks for client<->router over websockets
  • Benchmarks for router<->subgraph over websockets
  • Usage reporting, should we treat operations within a websocket connection the same as http requests?
  • Configurable connection limits
  • Configurable operations per connection limits
  • Get actual numbers for connection limits
  • Whatever is happening with zigbuild in the CI
  • Subgraph execute/subscribe timeouts for ws executor
  • Are tracing logs ok to log the whole client message? Maybe just the query/operation
  • When websockets are not enabled on the router, trying to connect to it will just hang
  • Open an issue on sonic_rs to handle the internally tagged enum
  • Stale h1 timer causes WebSocket connection termination ntex-rs/ntex#756
  • Propagate shutdown to user service from ws::start ntex-rs/ntex#765

*Also inherits those from #620.

Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

@enisdenjo enisdenjo merged commit b916fd9 into not-kamil-subs Feb 16, 2026
23 checks passed
@enisdenjo enisdenjo deleted the not-kamil-subs-ws branch February 16, 2026 17:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants