Skip to content

Commit f4e24b0

Browse files
authored
Merge pull request #5721 from cloudflare/dlapid/fix_jsrpc_return_event
Fix JSRPC Return event timing to fire when handler returns
2 parents 3d282f4 + f840511 commit f4e24b0

6 files changed

Lines changed: 268 additions & 4 deletions

File tree

src/workerd/api/tests/BUILD.bazel

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ wd_test(
3333
],
3434
)
3535

36+
# Test to validate timing semantics for JSRPC streaming responses.
37+
# This test verifies that Return events occur when the handler returns,
38+
# NOT when the stream is fully consumed.
39+
wd_test(
40+
src = "jsrpc-timing-test.wd-test",
41+
args = ["--experimental"],
42+
data = [
43+
"jsrpc-timing-test.js",
44+
"jsrpc-timing-test-tail.js",
45+
],
46+
)
47+
3648
# Test for a worker that can log to itself using the isTracer parameter
3749
wd_test(
3850
src = "self-logger-test.wd-test",
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright (c) 2017-2024 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
// Streaming tail worker that captures timing information for JSRPC invocations.
6+
// This validates that Return events occur at the expected time.
7+
//
8+
// Expected timeline:
9+
// T=0: Onset (invocation starts)
10+
// T=~500ms: Return (handler returns the stream)
11+
// T=~950ms: Outcome (stream fully consumed)
12+
13+
import * as assert from 'node:assert';
14+
15+
// Store events by invocation ID so we can analyze timing per-invocation
16+
let invocationEvents = new Map();
17+
18+
export default {
19+
tailStream(onsetEvent, env, ctx) {
20+
const invocationId = onsetEvent.invocationId;
21+
const baseTimestamp = onsetEvent.timestamp.getTime();
22+
23+
// Initialize event list for this invocation
24+
const events = [
25+
{
26+
type: onsetEvent.event.type,
27+
relativeTimeMs: 0,
28+
info: onsetEvent.event.info?.type,
29+
entrypoint: onsetEvent.event.entrypoint,
30+
},
31+
];
32+
invocationEvents.set(invocationId, { baseTimestamp, events });
33+
34+
return (event) => {
35+
const relativeTimeMs = event.timestamp.getTime() - baseTimestamp;
36+
37+
const eventRecord = {
38+
type: event.event.type,
39+
relativeTimeMs,
40+
};
41+
42+
// Capture additional info for specific event types
43+
if (event.event.type === 'return') {
44+
eventRecord.returnInfo = event.event.info;
45+
} else if (event.event.type === 'outcome') {
46+
eventRecord.outcome = event.event.outcome;
47+
eventRecord.cpuTime = event.event.cpuTime;
48+
eventRecord.wallTime = event.event.wallTime;
49+
} else if (event.event.type === 'attributes') {
50+
eventRecord.attributes = event.event.info;
51+
}
52+
53+
events.push(eventRecord);
54+
};
55+
},
56+
};
57+
58+
export const test = {
59+
async test() {
60+
// Wait briefly for all tail events to be processed
61+
await scheduler.wait(200);
62+
63+
// Find the StreamingService invocation (the JSRPC call)
64+
let streamingServiceEvents = null;
65+
for (const [invocationId, data] of invocationEvents.entries()) {
66+
const onset = data.events[0];
67+
if (onset.entrypoint === 'StreamingService' && onset.info === 'jsrpc') {
68+
streamingServiceEvents = data.events;
69+
break;
70+
}
71+
}
72+
73+
if (!streamingServiceEvents) {
74+
console.log('Captured invocations:');
75+
for (const [invocationId, data] of invocationEvents.entries()) {
76+
console.log(` ${invocationId}: ${JSON.stringify(data.events[0])}`);
77+
}
78+
throw new Error(
79+
'Could not find StreamingService JSRPC invocation events'
80+
);
81+
}
82+
83+
console.log(
84+
'StreamingService events:',
85+
JSON.stringify(streamingServiceEvents, null, 2)
86+
);
87+
88+
// Find the key events
89+
const onset = streamingServiceEvents.find((e) => e.type === 'onset');
90+
const returnEvent = streamingServiceEvents.find((e) => e.type === 'return');
91+
const outcome = streamingServiceEvents.find((e) => e.type === 'outcome');
92+
93+
// Validate all events are present
94+
assert.ok(onset, 'Missing onset event');
95+
assert.ok(outcome, 'Missing outcome event');
96+
97+
if (!returnEvent) {
98+
console.log('WARNING: No return event found for JSRPC invocation!');
99+
console.log(
100+
'This may indicate setReturn() is not being called for JSRPC.'
101+
);
102+
throw new Error('Return event not captured for JSRPC invocation');
103+
}
104+
105+
const timeToReturn = returnEvent.relativeTimeMs;
106+
const timeToOutcome = outcome.relativeTimeMs;
107+
const gap = timeToOutcome - timeToReturn;
108+
109+
console.log(`timeToReturn: ${timeToReturn}ms`);
110+
console.log(`timeToOutcome: ${timeToOutcome}ms`);
111+
console.log(`Gap between Return and Outcome: ${gap}ms`);
112+
113+
// Key validation: Return must happen BEFORE Outcome
114+
if (timeToReturn >= timeToOutcome) {
115+
throw new Error(
116+
`Return event (${timeToReturn}ms) should happen before Outcome (${timeToOutcome}ms). ` +
117+
`This indicates Return is fired when stream ends, not when handler returns.`
118+
);
119+
}
120+
121+
// Return should happen at least 200ms after onset (handler sleeps for 500ms)
122+
if (timeToReturn < 200) {
123+
throw new Error(
124+
`Return event (${timeToReturn}ms) happened too quickly. ` +
125+
`Expected at least 200ms from onset (handler sleeps for 500ms before returning).`
126+
);
127+
}
128+
129+
// Return should be significantly before Outcome (at least 200ms gap for stream drain)
130+
if (gap < 200) {
131+
throw new Error(
132+
`Gap between Return (${timeToReturn}ms) and Outcome (${timeToOutcome}ms) is only ${gap}ms. ` +
133+
`Expected at least 200ms for stream consumption.`
134+
);
135+
}
136+
137+
console.log(`PASS: Return event occurs ${gap}ms before Outcome`);
138+
},
139+
};
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2017-2024 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
// Test to validate timing semantics for JSRPC invocations with streaming responses.
6+
// This test verifies that the Return event is emitted at the correct time relative
7+
// to Onset and Outcome events.
8+
//
9+
// Expected timeline:
10+
// T=0: Onset (invocation starts)
11+
// T=~500ms: Return (handler returns the stream)
12+
// T=~950ms: Outcome (stream fully consumed)
13+
14+
import { WorkerEntrypoint } from 'cloudflare:workers';
15+
16+
export class StreamingService extends WorkerEntrypoint {
17+
async getStreamWithDelays() {
18+
// Sleep 500ms before returning the stream
19+
await scheduler.wait(500);
20+
21+
// Return a ReadableStream that takes ~450ms to drain
22+
const encoder = new TextEncoder();
23+
let chunksSent = 0;
24+
25+
const stream = new ReadableStream({
26+
async pull(controller) {
27+
if (chunksSent >= 3) {
28+
controller.close();
29+
return;
30+
}
31+
// Sleep 150ms between chunks (3 chunks = ~450ms to drain)
32+
await scheduler.wait(150);
33+
controller.enqueue(encoder.encode(`chunk${chunksSent++}\n`));
34+
},
35+
});
36+
37+
return stream;
38+
}
39+
}
40+
41+
export default {
42+
async test(controller, env, ctx) {
43+
// Call the streaming RPC method
44+
const stream = await env.StreamingService.getStreamWithDelays();
45+
46+
// Consume the stream fully
47+
const reader = stream.getReader();
48+
let chunks = [];
49+
while (true) {
50+
const { done, value } = await reader.read();
51+
if (done) break;
52+
chunks.push(new TextDecoder().decode(value));
53+
}
54+
55+
// Verify we got all chunks
56+
if (chunks.length !== 3) {
57+
throw new Error(`Expected 3 chunks, got ${chunks.length}`);
58+
}
59+
60+
// The actual timing validation is done in the tail worker's test() handler
61+
},
62+
};
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
# Test to validate timing semantics for JSRPC invocations with streaming responses.
4+
# This test verifies that the Return event is emitted when the handler returns,
5+
# NOT when the stream is fully consumed.
6+
#
7+
# Expected timeline:
8+
# T=0: Onset event (invocation starts)
9+
# T=~500ms: Return event (handler returns ReadableStream after 500ms sleep)
10+
# T=~950ms: Outcome event (stream fully consumed after ~450ms more)
11+
#
12+
# If Return event occurs at T=~950ms instead of T=~500ms, this indicates a bug
13+
# where setReturn() is called when capabilities are released rather than
14+
# when the handler actually returns.
15+
16+
const unitTests :Workerd.Config = (
17+
services = [
18+
(name = "jsrpc-timing-test", worker = .mainWorker),
19+
(name = "tail", worker = .tailWorker),
20+
],
21+
);
22+
23+
const mainWorker :Workerd.Worker = (
24+
modules = [
25+
(name = "worker", esModule = embed "jsrpc-timing-test.js")
26+
],
27+
compatibilityDate = "2024-01-01",
28+
compatibilityFlags = ["nodejs_compat", "experimental", "precise_timers"],
29+
bindings = [
30+
(name = "StreamingService", service = (name = "jsrpc-timing-test", entrypoint = "StreamingService")),
31+
],
32+
streamingTails = ["tail"],
33+
);
34+
35+
const tailWorker :Workerd.Worker = (
36+
modules = [
37+
(name = "worker", esModule = embed "jsrpc-timing-test-tail.js")
38+
],
39+
compatibilityDate = "2024-01-01",
40+
compatibilityFlags = ["experimental", "nodejs_compat", "precise_timers"],
41+
);

src/workerd/api/tests/tail-worker-test.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/workerd/api/worker-rpc.c++

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,13 +1768,25 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
17681768
kj::Maybe<kj::String> wrapperModule,
17691769
kj::Maybe<kj::Own<BaseTracer>> tracer)
17701770
: JsRpcTargetBase(ioCtx, CantOutliveIncomingRequest()),
1771+
ioCtx(ioCtx),
17711772
// Most of the time we don't really have to clone this but it's hard to fully prove, so
17721773
// let's be safe.
17731774
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })),
17741775
props(kj::mv(props)),
17751776
wrapperModule(kj::mv(wrapperModule)),
17761777
tracer(kj::mv(tracer)) {}
17771778

1779+
// Override call() to emit the Return event when the top-level RPC call completes.
1780+
// This marks when the handler returned a value, NOT when all data has been streamed or all
1781+
// capabilities released.
1782+
kj::Promise<void> call(CallContext callContext) override {
1783+
return JsRpcTargetBase::call(kj::mv(callContext)).then([this]() {
1784+
KJ_IF_SOME(t, ioCtx.getWorkerTracer()) {
1785+
t.setReturn(ioCtx.now());
1786+
}
1787+
});
1788+
}
1789+
17781790
TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override {
17791791
jsg::Lock& js = lock;
17801792

@@ -1842,6 +1854,7 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
18421854
}
18431855

18441856
private:
1857+
IoContext& ioCtx;
18451858
kj::Maybe<kj::String> entrypointName;
18461859
Frankenvalue props;
18471860
kj::Maybe<kj::String> wrapperModule;
@@ -1942,9 +1955,6 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEvent::run(
19421955
// and server as part of this session.
19431956
co_await donePromise.exclusiveJoin(ioctx.onAbort());
19441957

1945-
KJ_IF_SOME(t, ioctx.getWorkerTracer()) {
1946-
t.setReturn(ioctx.now());
1947-
}
19481958
co_return WorkerInterface::CustomEvent::Result{.outcome = EventOutcome::OK};
19491959
} catch (...) {
19501960
// Make sure the top-level capability is revoked with the same exception that `run()` is

0 commit comments

Comments
 (0)