Skip to content

fix(pipe-to): drain buffered flush chunks before shutdown#1

Merged
cramforce merged 2 commits intomainfrom
fix/specpipeto-flush-data-loss
Feb 8, 2026
Merged

fix(pipe-to): drain buffered flush chunks before shutdown#1
cramforce merged 2 commits intomainfrom
fix/specpipeto-flush-data-loss

Conversation

@feedthejim
Copy link
Copy Markdown
Contributor

@feedthejim feedthejim commented Feb 8, 2026

Summary

Two changes to specPipeTo:

  1. Bug fix: specPipeTo drops chunks enqueued during a TransformStream's flush() callback. reader.closed resolves (via Node.js events) before the pump drains buffered data, causing premature shutdown. Fix: move close-shutdown responsibility from reader.closed handler to the pump's done-detection.

  2. Perf optimization: Batch sync reads. Instead of yielding via queueMicrotask after every single chunk, drain all available buffered chunks in one loop before yielding. Also simplified pending-write tracking (Set → single promise).

Benchmark: Next.js force-dynamic page (4KB response)

Tested with Next.js minimal server, pipeTo(dest, { signal }) code path (specPipeTo).

Single client (c=1, 10s)

Mode req/s vs baseline
Web Streams (native) 611
Fast WebStreams (this PR) 1,011 +65%
Node Streams (full rewrite) 1,327 +117%

Under load (c=100, 10s)

Mode req/s p99 latency vs baseline
Web Streams (native) 1,167 144ms
Fast WebStreams (this PR) 1,128 113ms -3% req/s, -22% p99
Node Streams (full rewrite) 1,611 86ms +38% req/s, -40% p99

Before vs after this PR (Fast WebStreams only)

Metric Before (bug fix only) After (+ batch reads) Change
Single client req/s 566 1,011 +79%
Single client avg latency 1.34ms 0.29ms -78%
Under load req/s 1,001 1,128 +13%
Under load p99 141ms 113ms -20%

What changed

Bug fix (commit 1)

reader.closed.then() triggered shutdownWithAction() on source close. But Node.js fires the end/close events before the pump has consumed all buffered chunks from a Transform's flush(). This drops the last N-1 flush chunks.

Fix: reader.closed handler now only sets sourceClosed = true and kicks the pump. The pump reads { done: true } after draining all data, then triggers shutdown.

Perf optimization (commit 2)

The pump previously read one chunk via _readSync(), then yielded via queueMicrotask(pipeLoop). For N buffered chunks, that's N microtask yields.

Now: a while loop drains all buffered sync data before yielding once. Falls through to async reader.read() only when no sync data is available.

Also replaced pendingWrites Set + _waitForAll() with tracking only currentWrite (writes are ordered, so the last settling implies all prior writes settled).

Affected code paths

Any pipeTo call that hits specPipeTo (not Tier 0 fast pipeline):

  • pipeTo(dest, { signal }) — Next.js uses this
  • pipeTo(dest, { preventClose: true })
  • Manual concurrent piping (source.pipeTo(t.writable) + t.readable.pipeTo(dest))

Tier 0 fast pipeline (pipeline()) is unaffected.

Test plan

  • 7 new regression tests for flush chunk preservation
  • All 36 tests pass (7 new + 29 existing)
  • Before fix: 5/7 new tests fail with exact expected chunk drops
  • Smoke tested with Next.js: 4290 bytes (complete) vs 845 bytes (truncated before fix)
  • Benchmarked with Next.js minimal server (results above)

When a TransformStream's flush() callback enqueues multiple chunks,
specPipeTo drops all but the first because reader.closed resolves
(via Node.js 'end'/'close' events) before the pump loop drains the
buffer. The reader.closed handler calls shutdownWithAction(), setting
shuttingDown=true and stopping the pump prematurely.

Fix: move close-shutdown responsibility from the reader.closed promise
handler to the pump loop's done-detection. The reader.closed handler
now only sets sourceClosed=true and kicks the pump. The pump reads
done and triggers shutdown after draining all buffered chunks.

This affects any pipeTo call that uses specPipeTo (signal, preventClose,
preventAbort, preventCancel options) and any manual concurrent piping
pattern (source.pipeTo(t.writable) + t.readable.pipeTo(dest)).

The Tier 0 fast pipeline (via pipeline()) is unaffected.

Fixes truncated HTML responses in Next.js, which uses
readable.pipeTo(writer, { signal }) to stream SSR output through
multiple TransformStream hops.
Instead of yielding via queueMicrotask after every single sync read,
drain all available buffered chunks in one loop iteration before
yielding. This amortizes the microtask yield cost across multiple
chunks.

Also simplifies pending-write tracking: replaced the Set + _waitForAll
with a single currentWrite promise, since writes are ordered and the
last write settling implies all prior writes have settled.

Next.js benchmark (force-dynamic page, 4KB response):
  Single client: 566 -> 1,011 req/s (+79%)
  Under load:  1,001 -> 1,128 req/s (+13%)
@cramforce cramforce merged commit dffc706 into main Feb 8, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants