Skip to content

TCP ingress worker support#1429

Closed
jasnell wants to merge 3 commits intomainfrom
jsnell/experimental-tcp-ingress
Closed

TCP ingress worker support#1429
jasnell wants to merge 3 commits intomainfrom
jsnell/experimental-tcp-ingress

Conversation

@jasnell
Copy link
Copy Markdown
Collaborator

@jasnell jasnell commented Nov 22, 2023

Implements the connect handler and tcp-ingress for a worker

See the samples/tcp-ingress for an example

Note that this is only enabling this for local dev in workerd. A lot more additional work will need to be done internally to support TCP ingress in general but this at least lays the ground work. It also gives us some basic stuff we can use to test the Socket API and node:net implementation without relying on the internal production tests.

The proposed experimental API here is simple:

  • The worker exports a connect(event, env, ctx) handler
  • The event argument has two properties:
    • inbound -- A ReadableStream that provides access to the inbound data stream.
    • cf -- The JSON parsed cf structure
  • The return value is expected to be a ReadableStream or a Promise<ReadableStream>, internally this will be piped back to the client in the same way the fetch() handler's Response.
interface ConnectEvent {
  inbound: ReadableStream;
  cf: object;
};

connect(event : ConnectEvent, env: WorkersEnv, ctx: WorkersContext) : Promise<ReadableStream>|ReadableStream

This is essentially the same model as the fetch() handler with the Request and Response objects stripped away.

See this comment for more discussion of the API: #1429 (comment)

@jasnell jasnell closed this Apr 30, 2024
@jasnell jasnell reopened this Jun 11, 2024
@jasnell jasnell force-pushed the jsnell/experimental-tcp-ingress branch 2 times, most recently from 6c5ea04 to 82de22a Compare June 11, 2024 23:49
@jasnell jasnell changed the title Experimental tcp-ingress worker support TCP ingress worker support Jun 11, 2024
@jasnell jasnell force-pushed the jsnell/experimental-tcp-ingress branch from 82de22a to c240b34 Compare June 12, 2024 00:04
@jasnell jasnell marked this pull request as ready for review June 12, 2024 00:05
@jasnell jasnell requested review from a team as code owners June 12, 2024 00:05
@jasnell jasnell requested review from garrettgu10 and tewaro June 12, 2024 00:05
@jasnell
Copy link
Copy Markdown
Collaborator Author

jasnell commented Jun 13, 2024

Labeling this with the nodejs compat label only because this is a prereq for having proper tests for the node:net implementation

Copy link
Copy Markdown
Contributor

@dom96 dom96 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I left a couple questions/comments/suggestions but overall looks solid to me. Since this is a big change I think others should have a look as well (so won't be approving yet).

kj::AsyncIoStream& connection, ConnectResponse& response,
kj::HttpConnectSettings settings) {
JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite a bit of code in WorkerEntrypoint::connect seems similar to WorkerEntrypoint::request. Would refactoring to reduce duplication be possible?

Copy link
Copy Markdown
Collaborator Author

@jasnell jasnell Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely yes. For now I'd rather keep things separate, largely just to avoid the remote possibility of breaking the code path that is used in production. We can revisit and eliminate duplication in a future PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think we should introduce a whole bunch of duplicated code which will inevitably get out-of-sync long before anyone cleans it up.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before doing any deduplication I'd like the code review to focus on correctness. Is the code doing the right thing in the way it needs to be done. I'll reconcile the duplicated bits of code as a follow up.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't review the correctness of this code as-is. I would have to manually diff it against the code it was copied from. I need you to factor out the shared stuff so that I can adequately review it.

@jasnell jasnell force-pushed the jsnell/experimental-tcp-ingress branch from 074238e to 1dc2032 Compare June 13, 2024 19:44
@jasnell jasnell requested a review from dom96 June 13, 2024 19:53
Implements the connect handler and tcp-ingress for a worker

See the samples/tcp-ingress for an example
@jasnell jasnell force-pushed the jsnell/experimental-tcp-ingress branch from 1dc2032 to b55a511 Compare June 13, 2024 22:36
kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated();

kj::Maybe<kj::String> cfBlobJson;
if (!rewriter->hasCfBlobHeader()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole if/else is copied from HttpListener. Can we please factor out the common code?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, as mentioned, deduplication of the code is a next step. I want to make sure the initial implementation details are correct first.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went ahead and eliminated some of the duplication in server.c++. I'd prefer to leave the duplication in global-scope.c++ as is for now but added a todo comment.

auto req = service.startRequest(kj::mv(metadata));
auto response = kj::heap<ResponseWrapper>();
kj::HttpHeaders headers(headerTable);
// The empty string here is the host parameter that is required by the API
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the "correct" thing here would be to pass the listen address?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not used for any purpose so I'm not sure there is a "correct" thing.

kj::AsyncIoStream& connection, ConnectResponse& response,
kj::HttpConnectSettings settings) {
JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported");
auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think we should introduce a whole bunch of duplicated code which will inevitably get out-of-sync long before anyone cleans it up.

@kentonv
Copy link
Copy Markdown
Member

kentonv commented Jun 14, 2024

We should verify what the Cloudflare stack does with HTTP CONNECT requests today. Hopefully they don't make it through to the edge runtime. If they do, we need to think carefully about this change since such requests will now run the Worker's connect handler. (Not necessarily a bad thing long-term but maybe something we don't intend to enable right now.)

@jasnell
Copy link
Copy Markdown
Collaborator Author

jasnell commented Jun 14, 2024

We should verify what the Cloudflare stack does with HTTP CONNECT requests today. Hopefully they don't make it through to the edge runtime. If they do, we need to think carefully about this change since such requests will now run the Worker's connect handler. (Not necessarily a bad thing long-term but maybe something we don't intend to enable right now.)

For the time being I can have the globalScope->connect(...) handler throw if the experimental compat flag is not enabled.

@jasnell jasnell mentioned this pull request Jun 17, 2024
92 tasks
@kentonv
Copy link
Copy Markdown
Member

kentonv commented Jun 28, 2024

This really needs a design doc to discuss the API. It's hard for people to give API feedback if they first must read the code to figure out what the API looks like.

It looks like the connect() handler is like:

export default {
  async connect(event, env, ctx) {
    // event.inbound is the socket
    // event.cf is the CF blob
  }
}

I'm concerned that, unlike the fetch() API, this is very asymmetrical. The asymmetry makes it awkward to implement proxying: You can't just do return connect("upstream-host"). You end up having to set up a pair of pumps, like:

export default {
  async connect(event, env, ctx) {
    let up = connect("upstream-host");
    await Promise.all(
        env.inbound.readable.pipeTo(up.writable);
        up.readable.pipeTo(env.inbound.writable));
  }
}

This is kind of ugly, but there's a bigger problem than the aesthetics: The above code cannot apply deferred proxying optimization, since JavaScript has to await the two pipes. How do we avoid that? Seems tricky.

I would argue instead that the connect() handler should have the same signature as the global connect():

export default {
  async connect(addr, env, ctx) {
    return connect("upstream-host");
  }
}

Now proxying is simple and the pump can be deferred in an obvious way.

To allow people to directly respond, we would probably want to offer a notion of a SocketPair, which works a lot like WebSocketPair. They end up with similar code to handling a WebSocket request in fetch().

Sorry that this feedback comes after so much delay, but this is a basic problem with the format of "feature proposal as PR": reviewers will tend to look at the code, and will have a hard time seeing the high-level design, because that's what you're presenting them. We really need feature requests like this to start out with a design doc explaining the high-level design, especially API details. It doesn't have to be long, it's just to prompt discussion of the API before spending time on code.

@jasnell
Copy link
Copy Markdown
Collaborator Author

jasnell commented Jun 29, 2024

@kentonv :

It looks like the connect() handler is like ...

Not quite... The event.inbound is a ReadableStream not a Socket. The return value is a Promise<ReadableStream> ... So it would be...

export default {
  async connect(event, env, ctx) {
    const up = connect("upstream-host");
    // will use an optimized pipe since both are internal streams
    ctx.waitUntil(event.inbound.pipeTo(up.writable));
    return up.readable;
  }
}

// Would be a bit nicer here if the Socket subrequest `connect(...)` API had a way of receiving the
// inbound readable such that it automatically wired up the pipe... e.g. `connect('upstream-host', { outbound: event.inbound })`
// But that is a limitation of the Socket API not the TCP ingress API.

We could also support a return value of Promise<ReadableWritablePair> and automatically wire up the necessary pipes, allowing something like...

export default {
  async connect(event, env, ctx) {
    // The internal handler logic would detect this case and wire event.inbound to socket.writable,
    // and respond with the socket.readable... This would just be a syntactic sugar alternative to
    // the above example tho, 
    return connect('upstream-host');
  }
}

I've expanded the PR description with the brief description of the simple experimental API implemented here.

To allow people to directly respond, we would probably want to offer a notion of a SocketPair, which works a lot like WebSocketPair. They end up with similar code to handling a WebSocket request in fetch().

I disagree here. I originally started down that path on the implementation here and there's really no reason to pass the writable side of this around. Let's take the simplest possible example, an echo service:

export default {
  // Uses deferred proxy...
  connect({ inbound }) { return inbound; }
}

Or a slightly more complicated case that responds to individual inputs...

async function processInbound(readable, writable) {
  for await (const chunk of readable) {
     // Process the chunk...
     const result = processChunk();
     // Write the result
     await writable.write(result);
  }
}

export default {
  connect({ inbound }, env, ctx) {
    const { writable, readable } = new IdentityTransformStream();
    ctx.waitUntil(processInbound(inbound, writable);
    return readable;  // Can use deferred proxy but limited by waitUntil...
  }
}

This could also be implemented with a TransformStream fairly easily with good symmetry...
(however, because this uses a JS-backed transform deferred proxy isn't used, but that's not specific to connect)

export default {
  connect({ inbound }, env, ctx) {
    return inbound.pipeThrough(new TransformStream({
      transform(chunk, controller) {
        controller.enqueue(processChunk());
      }
    }));
  }
}

Or, proxy a fetch subrequest

export default {
  async connect({ inbound }, env, ctx) {
    const { writable, readable } = new IdentityTransformStream();
    const request = new Request('http://example.org', { body: readable });
    ctx.waitUntil(inbound.pipeTo(writable));
    const resp = await fetch(request);
    return resp.body;  // body is a ReadableStream... uses deferred proxy
  }
}

I would also argue that we do not need to solve any API issues right now. This is an experimental mechanism that is not currently intended for production use beyond supporting a handful of tests for CI

return new Response("ok");
},

connect({inbound, cf}) {
Copy link
Copy Markdown
Collaborator Author

@jasnell jasnell Jun 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewer note:

inbound is a ReadableStream

The expected return value is ReadableStream (or more correctly, Promise<ReadableStream>)

If the returned ReadableStream is not a JavaScript-backed ReadableStream, then it should support the deferred proxy with no problem unless there's something else preventing it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants