Skip to content

Commit 0f4e6c8

Browse files
authored
avoid emitting unhandled rejections on DecompressionStream (#6108)
1 parent f3fdde1 commit 0f4e6c8

5 files changed

Lines changed: 89 additions & 8 deletions

File tree

src/workerd/api/streams/readable.c++

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -513,14 +513,12 @@ jsg::Ref<ReadableStream> ReadableStream::pipeThrough(
513513

514514
auto options = kj::mv(maybeOptions).orDefault({});
515515
options.pipeThrough = true;
516+
// The lambda intentionally captures self as a visitable reference, ensuring
517+
// JSG_THIS stays alive until the pipe promise resolves.
516518
controller.pipeTo(js, destination, kj::mv(options))
517519
.then(js,
518520
JSG_VISITABLE_LAMBDA(
519-
(self = JSG_THIS), (self), (jsg::Lock& js) { return js.resolvedPromise(); }),
520-
JSG_VISITABLE_LAMBDA((self = JSG_THIS), (self),
521-
(jsg::Lock& js, auto&& exception) {
522-
return js.rejectedPromise<void>(kj::mv(exception));
523-
}))
521+
(self = JSG_THIS), (self), (jsg::Lock& js) { return js.resolvedPromise(); }))
524522
.markAsHandled(js);
525523
return kj::mv(transform.readable);
526524
}
@@ -583,11 +581,16 @@ jsg::Promise<void> ReadableStream::returnFunction(
583581
if (!state.preventCancel) {
584582
auto promise = reader->cancel(js, value.map([&](jsg::Value& v) { return v.getHandle(js); }));
585583
reader->releaseLock(js);
586-
return promise.then(js,
584+
auto result = promise.then(js,
587585
JSG_VISITABLE_LAMBDA((reader = kj::mv(reader)), (reader), (jsg::Lock& js) {
588586
// Ensure that the reader is not garbage collected until the cancel promise resolves.
589587
return js.resolvedPromise();
590588
}));
589+
// When the stream is already errored, cancel() returns a rejected promise
590+
// that propagates through the .then() chain. Mark it as handled so V8 does
591+
// not fire unhandledrejection events during iterator teardown.
592+
result.markAsHandled(js);
593+
return kj::mv(result);
591594
}
592595

593596
reader->releaseLock(js);

src/workerd/api/tests/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@ wd_test(
299299
data = ["unhandled-rejection-test.js"],
300300
)
301301

302+
wd_test(
303+
size = "large",
304+
src = "decompression-stream-unhandled-rejection-test.wd-test",
305+
args = ["--experimental"],
306+
data = ["decompression-stream-unhandled-rejection-test.js"],
307+
)
308+
302309
wd_test(
303310
size = "large",
304311
src = "htmlrewriter-test.wd-test",
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Regression test for https://github.com/cloudflare/workerd/issues/6061
2+
// DecompressionStream generates unhandled rejections internally when
3+
// streams are consumed via Array.fromAsync, even when the caller
4+
// properly handles errors.
5+
6+
import { strictEqual } from 'node:assert';
7+
import { mock } from 'node:test';
8+
import { setTimeout } from 'node:timers/promises';
9+
10+
export const decompessionStreamUnhandledRejection = {
11+
async test() {
12+
const rejectionFn = mock.fn();
13+
const unhandledRejectionFn = mock.fn();
14+
globalThis.addEventListener('rejectionhandled', rejectionFn);
15+
globalThis.addEventListener('unhandledrejection', unhandledRejectionFn);
16+
17+
const valid = new Uint8Array([120, 156, 75, 4, 0, 0, 98, 0, 98]); // deflate('a')
18+
const empty = new Uint8Array(1);
19+
const invalid = new Uint8Array([...valid, ...empty]);
20+
const double = new Uint8Array([...valid, ...valid]);
21+
for (const chunks of [
22+
[valid],
23+
[invalid],
24+
[valid, empty],
25+
[valid, valid],
26+
[double],
27+
]) {
28+
try {
29+
const stream = new Blob(chunks)
30+
.stream()
31+
.pipeThrough(new DecompressionStream('deflate'));
32+
await Array.fromAsync(stream);
33+
} catch (error) {
34+
strictEqual(
35+
error.message,
36+
'Trailing bytes after end of compressed data'
37+
);
38+
}
39+
}
40+
41+
await setTimeout(200);
42+
globalThis.removeEventListener('rejectionhandled', rejectionFn);
43+
globalThis.removeEventListener('unhandledrejection', unhandledRejectionFn);
44+
strictEqual(
45+
rejectionFn.mock.callCount(),
46+
unhandledRejectionFn.mock.callCount()
47+
);
48+
},
49+
};
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "decompression-stream-unhandled-rejection-test",
6+
worker = (
7+
modules = [
8+
(name = "worker", esModule = embed "decompression-stream-unhandled-rejection-test.js")
9+
],
10+
compatibilityFlags = [
11+
"nodejs_compat",
12+
"strict_compression_checks",
13+
"unhandled_rejection_after_microtask_checkpoint",
14+
]
15+
)
16+
),
17+
],
18+
);

src/workerd/jsg/iterator.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ class AsyncIteratorBase: public Object {
815815

816816
void pushCurrent(Lock& js, Promise<void> promise) {
817817
auto& inner = state.template get<InnerState>();
818-
inner.impl.pushCurrent(promise.whenResolved(js).then(js, [this, self = JSG_THIS](Lock& js) {
818+
auto result = promise.whenResolved(js).then(js, [this, self = JSG_THIS](Lock& js) {
819819
// If state is Finished, then there's nothing we need to do here.
820820
KJ_IF_SOME(inner, state.template tryGet<InnerState>()) {
821821
inner.impl.popCurrent();
@@ -826,7 +826,11 @@ class AsyncIteratorBase: public Object {
826826
inner.impl.popCurrent();
827827
}
828828
return js.rejectedPromise<void>(kj::mv(value));
829-
}));
829+
});
830+
// The error is already propagated through the promise returned by nextImpl/returnImpl.
831+
// Mark as handled so the internally-held promise does not trigger unhandledrejection.
832+
result.markAsHandled(js);
833+
inner.impl.pushCurrent(kj::mv(result));
830834
}
831835

832836
Promise<Next> nextImpl(Lock& js, NextSignature nextFunc) {

0 commit comments

Comments
 (0)