Skip to content

Commit 7b1b5ee

Browse files
committed
Update streams/compression to use checked queue
1 parent 381aa3e commit 7b1b5ee

1 file changed

Lines changed: 16 additions & 19 deletions

File tree

src/workerd/api/streams/compression.c++

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
#include "nbytes.h"
88

99
#include <workerd/io/features.h>
10+
#include <workerd/util/checked-queue.h>
1011

1112
#include <iterator>
12-
#include <list>
1313
#include <vector>
1414

1515
namespace workerd::api {
@@ -330,13 +330,12 @@ class CompressionStreamImpl: public kj::Refcounted,
330330
void cancelInternal(kj::Exception reason) {
331331
output.clear();
332332

333-
while (!pendingReads.empty()) {
334-
auto pending = kj::mv(pendingReads.front());
335-
pendingReads.pop_front();
333+
pendingReads.drainTo([&](PendingRead&& pending) {
336334
if (pending.promise->isWaiting()) {
337335
pending.promise->reject(kj::cp(reason));
338336
}
339-
}
337+
});
338+
KJ_DASSERT(pendingReads.empty());
340339

341340
canceler.cancel(kj::cp(reason));
342341
state = kj::mv(reason);
@@ -373,7 +372,7 @@ class CompressionStreamImpl: public kj::Refcounted,
373372
pendingRead.filled = copyIntoBuffer(dest);
374373
}
375374

376-
pendingReads.push_back(kj::mv(pendingRead));
375+
pendingReads.push(kj::mv(pendingRead));
377376

378377
return canceler.wrap(kj::mv(promise.promise));
379378
}
@@ -413,7 +412,7 @@ class CompressionStreamImpl: public kj::Refcounted,
413412
// If there are pending reads and data to be read, we'll loop through
414413
// the pending reads and fulfill them as much as possible.
415414
while (!pendingReads.empty() && output.size() > 0) {
416-
auto& pending = pendingReads.front();
415+
auto& pending = KJ_ASSERT_NONNULL(pendingReads.peek());
417416

418417
if (!pending.promise->isWaiting()) {
419418
// The pending read was canceled!
@@ -444,9 +443,10 @@ class CompressionStreamImpl: public kj::Refcounted,
444443
// If we've met the minimum bytes requirement for the pending read, fulfill
445444
// the read promise.
446445
if (pending.filled >= pending.minBytes) {
447-
auto p = kj::mv(pending);
448-
pendingReads.pop_front();
449-
p.promise->fulfill(kj::mv(p.filled));
446+
auto p = KJ_ASSERT_NONNULL(pendingReads.pop());
447+
if (p.promise->isWaiting()) {
448+
p.promise->fulfill(kj::mv(p.filled));
449+
}
450450
continue;
451451
}
452452

@@ -461,14 +461,13 @@ class CompressionStreamImpl: public kj::Refcounted,
461461
// far, output.empty() must be true. Let's check.
462462
KJ_ASSERT(output.empty());
463463
// We need to flush any remaining reads.
464-
while (!pendingReads.empty()) {
465-
auto pending = kj::mv(pendingReads.front());
466-
pendingReads.pop_front();
467-
if (pending.promise->isWaiting()) {
464+
pendingReads.drainTo([&](PendingRead&& p) {
465+
if (p.promise->isWaiting()) {
468466
// Fulfill the pending read promise only if it hasn't already been canceled.
469-
pending.promise->fulfill(kj::mv(pending.filled));
467+
p.promise->fulfill(kj::mv(p.filled));
470468
}
471-
}
469+
});
470+
KJ_DASSERT(pendingReads.empty());
472471
}
473472

474473
return kj::READY_NOW;
@@ -482,9 +481,7 @@ class CompressionStreamImpl: public kj::Refcounted,
482481

483482
kj::Canceler canceler;
484483
LazyBuffer output;
485-
// We use std::list to keep memory overhead low when there are many streams with no or few pending
486-
// reads.
487-
std::list<PendingRead> pendingReads;
484+
workerd::util::Queue<PendingRead> pendingReads;
488485
};
489486
} // namespace
490487

0 commit comments

Comments
 (0)