|
| 1 | +// Copyright (c) 2017-2024 Cloudflare, Inc. |
| 2 | +// Licensed under the Apache 2.0 license found in the LICENSE file or at: |
| 3 | +// https://opensource.org/licenses/Apache-2.0 |
| 4 | + |
| 5 | +// Streaming tail worker that captures timing information for JSRPC invocations. |
| 6 | +// This validates that Return events occur at the expected time. |
| 7 | +// |
| 8 | +// Expected timeline: |
| 9 | +// T=0: Onset (invocation starts) |
| 10 | +// T=~500ms: Return (handler returns the stream) |
| 11 | +// T=~950ms: Outcome (stream fully consumed) |
| 12 | + |
| 13 | +import * as assert from 'node:assert'; |
| 14 | + |
| 15 | +// Store events by invocation ID so we can analyze timing per-invocation |
| 16 | +let invocationEvents = new Map(); |
| 17 | + |
| 18 | +export default { |
| 19 | + tailStream(onsetEvent, env, ctx) { |
| 20 | + const invocationId = onsetEvent.invocationId; |
| 21 | + const baseTimestamp = onsetEvent.timestamp.getTime(); |
| 22 | + |
| 23 | + // Initialize event list for this invocation |
| 24 | + const events = [ |
| 25 | + { |
| 26 | + type: onsetEvent.event.type, |
| 27 | + relativeTimeMs: 0, |
| 28 | + info: onsetEvent.event.info?.type, |
| 29 | + entrypoint: onsetEvent.event.entrypoint, |
| 30 | + }, |
| 31 | + ]; |
| 32 | + invocationEvents.set(invocationId, { baseTimestamp, events }); |
| 33 | + |
| 34 | + return (event) => { |
| 35 | + const relativeTimeMs = event.timestamp.getTime() - baseTimestamp; |
| 36 | + |
| 37 | + const eventRecord = { |
| 38 | + type: event.event.type, |
| 39 | + relativeTimeMs, |
| 40 | + }; |
| 41 | + |
| 42 | + // Capture additional info for specific event types |
| 43 | + if (event.event.type === 'return') { |
| 44 | + eventRecord.returnInfo = event.event.info; |
| 45 | + } else if (event.event.type === 'outcome') { |
| 46 | + eventRecord.outcome = event.event.outcome; |
| 47 | + eventRecord.cpuTime = event.event.cpuTime; |
| 48 | + eventRecord.wallTime = event.event.wallTime; |
| 49 | + } else if (event.event.type === 'attributes') { |
| 50 | + eventRecord.attributes = event.event.info; |
| 51 | + } |
| 52 | + |
| 53 | + events.push(eventRecord); |
| 54 | + }; |
| 55 | + }, |
| 56 | +}; |
| 57 | + |
| 58 | +export const test = { |
| 59 | + async test() { |
| 60 | + // Wait briefly for all tail events to be processed |
| 61 | + await scheduler.wait(200); |
| 62 | + |
| 63 | + // Find the StreamingService invocation (the JSRPC call) |
| 64 | + let streamingServiceEvents = null; |
| 65 | + for (const [invocationId, data] of invocationEvents.entries()) { |
| 66 | + const onset = data.events[0]; |
| 67 | + if (onset.entrypoint === 'StreamingService' && onset.info === 'jsrpc') { |
| 68 | + streamingServiceEvents = data.events; |
| 69 | + break; |
| 70 | + } |
| 71 | + } |
| 72 | + |
| 73 | + if (!streamingServiceEvents) { |
| 74 | + console.log('Captured invocations:'); |
| 75 | + for (const [invocationId, data] of invocationEvents.entries()) { |
| 76 | + console.log(` ${invocationId}: ${JSON.stringify(data.events[0])}`); |
| 77 | + } |
| 78 | + throw new Error( |
| 79 | + 'Could not find StreamingService JSRPC invocation events' |
| 80 | + ); |
| 81 | + } |
| 82 | + |
| 83 | + console.log( |
| 84 | + 'StreamingService events:', |
| 85 | + JSON.stringify(streamingServiceEvents, null, 2) |
| 86 | + ); |
| 87 | + |
| 88 | + // Find the key events |
| 89 | + const onset = streamingServiceEvents.find((e) => e.type === 'onset'); |
| 90 | + const returnEvent = streamingServiceEvents.find((e) => e.type === 'return'); |
| 91 | + const outcome = streamingServiceEvents.find((e) => e.type === 'outcome'); |
| 92 | + |
| 93 | + // Validate all events are present |
| 94 | + assert.ok(onset, 'Missing onset event'); |
| 95 | + assert.ok(outcome, 'Missing outcome event'); |
| 96 | + |
| 97 | + if (!returnEvent) { |
| 98 | + console.log('WARNING: No return event found for JSRPC invocation!'); |
| 99 | + console.log( |
| 100 | + 'This may indicate setReturn() is not being called for JSRPC.' |
| 101 | + ); |
| 102 | + throw new Error('Return event not captured for JSRPC invocation'); |
| 103 | + } |
| 104 | + |
| 105 | + const timeToReturn = returnEvent.relativeTimeMs; |
| 106 | + const timeToOutcome = outcome.relativeTimeMs; |
| 107 | + const gap = timeToOutcome - timeToReturn; |
| 108 | + |
| 109 | + console.log(`timeToReturn: ${timeToReturn}ms`); |
| 110 | + console.log(`timeToOutcome: ${timeToOutcome}ms`); |
| 111 | + console.log(`Gap between Return and Outcome: ${gap}ms`); |
| 112 | + |
| 113 | + // Key validation: Return must happen BEFORE Outcome |
| 114 | + if (timeToReturn >= timeToOutcome) { |
| 115 | + throw new Error( |
| 116 | + `Return event (${timeToReturn}ms) should happen before Outcome (${timeToOutcome}ms). ` + |
| 117 | + `This indicates Return is fired when stream ends, not when handler returns.` |
| 118 | + ); |
| 119 | + } |
| 120 | + |
| 121 | + // Return should happen at least 200ms after onset (handler sleeps for 500ms) |
| 122 | + if (timeToReturn < 200) { |
| 123 | + throw new Error( |
| 124 | + `Return event (${timeToReturn}ms) happened too quickly. ` + |
| 125 | + `Expected at least 200ms from onset (handler sleeps for 500ms before returning).` |
| 126 | + ); |
| 127 | + } |
| 128 | + |
| 129 | + // Return should be significantly before Outcome (at least 200ms gap for stream drain) |
| 130 | + if (gap < 200) { |
| 131 | + throw new Error( |
| 132 | + `Gap between Return (${timeToReturn}ms) and Outcome (${timeToOutcome}ms) is only ${gap}ms. ` + |
| 133 | + `Expected at least 200ms for stream consumption.` |
| 134 | + ); |
| 135 | + } |
| 136 | + |
| 137 | + console.log(`PASS: Return event occurs ${gap}ms before Outcome`); |
| 138 | + }, |
| 139 | +}; |
0 commit comments