fix(pipe-to): drain buffered flush chunks before shutdown#1
Merged
Conversation
When a TransformStream's flush() callback enqueues multiple chunks,
specPipeTo drops all but the first because reader.closed resolves
(via Node.js 'end'/'close' events) before the pump loop drains the
buffer. The reader.closed handler calls shutdownWithAction(), setting
shuttingDown=true and stopping the pump prematurely.
Fix: move close-shutdown responsibility from the reader.closed promise
handler to the pump loop's done-detection. The reader.closed handler
now only sets sourceClosed=true and kicks the pump. The pump reads
done and triggers shutdown after draining all buffered chunks.
This affects any pipeTo call that uses specPipeTo (signal, preventClose,
preventAbort, preventCancel options) and any manual concurrent piping
pattern (source.pipeTo(t.writable) + t.readable.pipeTo(dest)).
The Tier 0 fast pipeline (via pipeline()) is unaffected.
Fixes truncated HTML responses in Next.js, which uses
readable.pipeTo(writer, { signal }) to stream SSR output through
multiple TransformStream hops.
Instead of yielding via queueMicrotask after every single sync read, drain all available buffered chunks in one loop iteration before yielding. This amortizes the microtask yield cost across multiple chunks. Also simplifies pending-write tracking: replaced the Set + _waitForAll with a single currentWrite promise, since writes are ordered and the last write settling implies all prior writes have settled. Next.js benchmark (force-dynamic page, 4KB response): Single client: 566 -> 1,011 req/s (+79%) Under load: 1,001 -> 1,128 req/s (+13%)
4 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two changes to
specPipeTo:Bug fix:
specPipeTodrops chunks enqueued during aTransformStream'sflush()callback.reader.closedresolves (via Node.js events) before the pump drains buffered data, causing premature shutdown. Fix: move close-shutdown responsibility fromreader.closedhandler to the pump's done-detection.Perf optimization: Batch sync reads. Instead of yielding via
queueMicrotaskafter every single chunk, drain all available buffered chunks in one loop before yielding. Also simplified pending-write tracking (Set → single promise).Benchmark: Next.js force-dynamic page (4KB response)
Tested with Next.js minimal server,
pipeTo(dest, { signal })code path (specPipeTo).Single client (c=1, 10s)
Under load (c=100, 10s)
Before vs after this PR (Fast WebStreams only)
What changed
Bug fix (commit 1)
reader.closed.then()triggeredshutdownWithAction()on source close. But Node.js fires theend/closeevents before the pump has consumed all buffered chunks from a Transform'sflush(). This drops the last N-1 flush chunks.Fix:
reader.closedhandler now only setssourceClosed = trueand kicks the pump. The pump reads{ done: true }after draining all data, then triggers shutdown.Perf optimization (commit 2)
The pump previously read one chunk via
_readSync(), then yielded viaqueueMicrotask(pipeLoop). For N buffered chunks, that's N microtask yields.Now: a
whileloop drains all buffered sync data before yielding once. Falls through to asyncreader.read()only when no sync data is available.Also replaced
pendingWritesSet +_waitForAll()with tracking onlycurrentWrite(writes are ordered, so the last settling implies all prior writes settled).Affected code paths
Any
pipeTocall that hitsspecPipeTo(not Tier 0 fast pipeline):pipeTo(dest, { signal })— Next.js uses thispipeTo(dest, { preventClose: true })source.pipeTo(t.writable)+t.readable.pipeTo(dest))Tier 0 fast pipeline (
pipeline()) is unaffected.Test plan