Conversation
6c5ea04 to
82de22a
Compare
82de22a to
c240b34
Compare
cc89799 to
074238e
Compare
|
Labeling this with the |
| kj::AsyncIoStream& connection, ConnectResponse& response, | ||
| kj::HttpConnectSettings settings) { | ||
| JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); | ||
| auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, |
There was a problem hiding this comment.
Quite a bit of code in WorkerEntrypoint::connect seems similar to WorkerEntrypoint::request. Would refactoring to reduce duplication be possible?
There was a problem hiding this comment.
Likely yes. For now I'd rather keep things separate, largely just to avoid the remote possibility of breaking the code path that is used in production. We can revisit and eliminate duplication in a future PR
There was a problem hiding this comment.
I really don't think we should introduce a whole bunch of duplicated code which will inevitably get out-of-sync long before anyone cleans it up.
There was a problem hiding this comment.
Before doing any deduplication I'd like the code review to focus on correctness. Is the code doing the right thing in the way it needs to be done. I'll reconcile the duplicated bits of code as a follow up.
There was a problem hiding this comment.
I can't review the correctness of this code as-is. I would have to manually diff it against the code it was copied from. I need you to factor out the shared stuff so that I can adequately review it.
074238e to
1dc2032
Compare
Implements the connect handler and tcp-ingress for a worker See the samples/tcp-ingress for an example
1dc2032 to
b55a511
Compare
| kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated(); | ||
|
|
||
| kj::Maybe<kj::String> cfBlobJson; | ||
| if (!rewriter->hasCfBlobHeader()) { |
There was a problem hiding this comment.
This whole if/else is copied from HttpListener. Can we please factor out the common code?
There was a problem hiding this comment.
yep, as mentioned, deduplication of the code is a next step. I want to make sure the initial implementation details are correct first.
There was a problem hiding this comment.
Went ahead and eliminated some of the duplication in server.c++. I'd prefer to leave the duplication in global-scope.c++ as is for now but added a todo comment.
| auto req = service.startRequest(kj::mv(metadata)); | ||
| auto response = kj::heap<ResponseWrapper>(); | ||
| kj::HttpHeaders headers(headerTable); | ||
| // The empty string here is the host parameter that is required by the API |
There was a problem hiding this comment.
Seems like the "correct" thing here would be to pass the listen address?
There was a problem hiding this comment.
It's not used for any purpose so I'm not sure there is a "correct" thing.
| kj::AsyncIoStream& connection, ConnectResponse& response, | ||
| kj::HttpConnectSettings settings) { | ||
| JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); | ||
| auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, |
There was a problem hiding this comment.
I really don't think we should introduce a whole bunch of duplicated code which will inevitably get out-of-sync long before anyone cleans it up.
|
We should verify what the Cloudflare stack does with HTTP CONNECT requests today. Hopefully they don't make it through to the edge runtime. If they do, we need to think carefully about this change since such requests will now run the Worker's connect handler. (Not necessarily a bad thing long-term but maybe something we don't intend to enable right now.) |
For the time being I can have the |
|
This really needs a design doc to discuss the API. It's hard for people to give API feedback if they first must read the code to figure out what the API looks like. It looks like the I'm concerned that, unlike the This is kind of ugly, but there's a bigger problem than the aesthetics: The above code cannot apply deferred proxying optimization, since JavaScript has to await the two pipes. How do we avoid that? Seems tricky. I would argue instead that the Now proxying is simple and the pump can be deferred in an obvious way. To allow people to directly respond, we would probably want to offer a notion of a Sorry that this feedback comes after so much delay, but this is a basic problem with the format of "feature proposal as PR": reviewers will tend to look at the code, and will have a hard time seeing the high-level design, because that's what you're presenting them. We really need feature requests like this to start out with a design doc explaining the high-level design, especially API details. It doesn't have to be long, it's just to prompt discussion of the API before spending time on code. |
|
@kentonv :
Not quite... The export default {
async connect(event, env, ctx) {
const up = connect("upstream-host");
// will use an optimized pipe since both are internal streams
ctx.waitUntil(event.inbound.pipeTo(up.writable));
return up.readable;
}
}
// Would be a bit nicer here if the Socket subrequest `connect(...)` API had a way of receiving the
// inbound readable such that it automatically wired up the pipe... e.g. `connect('upstream-host', { outbound: event.inbound })`
// But that is a limitation of the Socket API not the TCP ingress API.We could also support a return value of export default {
async connect(event, env, ctx) {
// The internal handler logic would detect this case and wire event.inbound to socket.writable,
// and respond with the socket.readable... This would just be a syntactic sugar alternative to
// the above example tho,
return connect('upstream-host');
}
}I've expanded the PR description with the brief description of the simple experimental API implemented here.
I disagree here. I originally started down that path on the implementation here and there's really no reason to pass the export default {
// Uses deferred proxy...
connect({ inbound }) { return inbound; }
}Or a slightly more complicated case that responds to individual inputs... async function processInbound(readable, writable) {
for await (const chunk of readable) {
// Process the chunk...
const result = processChunk();
// Write the result
await writable.write(result);
}
}
export default {
connect({ inbound }, env, ctx) {
const { writable, readable } = new IdentityTransformStream();
ctx.waitUntil(processInbound(inbound, writable);
return readable; // Can use deferred proxy but limited by waitUntil...
}
}This could also be implemented with a TransformStream fairly easily with good symmetry... export default {
connect({ inbound }, env, ctx) {
return inbound.pipeThrough(new TransformStream({
transform(chunk, controller) {
controller.enqueue(processChunk());
}
}));
}
}Or, proxy a fetch subrequest export default {
async connect({ inbound }, env, ctx) {
const { writable, readable } = new IdentityTransformStream();
const request = new Request('http://example.org', { body: readable });
ctx.waitUntil(inbound.pipeTo(writable));
const resp = await fetch(request);
return resp.body; // body is a ReadableStream... uses deferred proxy
}
}I would also argue that we do not need to solve any API issues right now. This is an experimental mechanism that is not currently intended for production use beyond supporting a handful of tests for CI |
| return new Response("ok"); | ||
| }, | ||
|
|
||
| connect({inbound, cf}) { |
There was a problem hiding this comment.
Reviewer note:
inbound is a ReadableStream
The expected return value is ReadableStream (or more correctly, Promise<ReadableStream>)
If the returned ReadableStream is not a JavaScript-backed ReadableStream, then it should support the deferred proxy with no problem unless there's something else preventing it.
Implements the connect handler and tcp-ingress for a worker
See the samples/tcp-ingress for an example
Note that this is only enabling this for local dev in workerd. A lot more additional work will need to be done internally to support TCP ingress in general but this at least lays the ground work. It also gives us some basic stuff we can use to test the Socket API and
node:netimplementation without relying on the internal production tests.The proposed experimental API here is simple:
connect(event, env, ctx)handlereventargument has two properties:inbound-- AReadableStreamthat provides access to the inbound data stream.cf-- The JSON parsedcfstructureReadableStreamor aPromise<ReadableStream>, internally this will be piped back to the client in the same way thefetch()handler'sResponse.This is essentially the same model as the
fetch()handler with theRequestandResponseobjects stripped away.See this comment for more discussion of the API: #1429 (comment)