77#include " nbytes.h"
88
99#include < workerd/io/features.h>
10+ #include < workerd/util/checked-queue.h>
1011
1112#include < iterator>
12- #include < list>
1313#include < vector>
1414
1515namespace workerd ::api {
@@ -330,13 +330,12 @@ class CompressionStreamImpl: public kj::Refcounted,
330330 void cancelInternal (kj::Exception reason) {
331331 output.clear ();
332332
333- while (!pendingReads.empty ()) {
334- auto pending = kj::mv (pendingReads.front ());
335- pendingReads.pop_front ();
333+ pendingReads.drainTo ([&](PendingRead&& pending) {
336334 if (pending.promise ->isWaiting ()) {
337335 pending.promise ->reject (kj::cp (reason));
338336 }
339- }
337+ });
338+ KJ_DASSERT (pendingReads.empty ());
340339
341340 canceler.cancel (kj::cp (reason));
342341 state = kj::mv (reason);
@@ -373,7 +372,7 @@ class CompressionStreamImpl: public kj::Refcounted,
373372 pendingRead.filled = copyIntoBuffer (dest);
374373 }
375374
376- pendingReads.push_back (kj::mv (pendingRead));
375+ pendingReads.push (kj::mv (pendingRead));
377376
378377 return canceler.wrap (kj::mv (promise.promise ));
379378 }
@@ -413,7 +412,7 @@ class CompressionStreamImpl: public kj::Refcounted,
413412 // If there are pending reads and data to be read, we'll loop through
414413 // the pending reads and fulfill them as much as possible.
415414 while (!pendingReads.empty () && output.size () > 0 ) {
416- auto & pending = pendingReads.front ( );
415+ auto & pending = KJ_ASSERT_NONNULL ( pendingReads.peek () );
417416
418417 if (!pending.promise ->isWaiting ()) {
419418 // The pending read was canceled!
@@ -444,9 +443,10 @@ class CompressionStreamImpl: public kj::Refcounted,
444443 // If we've met the minimum bytes requirement for the pending read, fulfill
445444 // the read promise.
446445 if (pending.filled >= pending.minBytes ) {
447- auto p = kj::mv (pending);
448- pendingReads.pop_front ();
449- p.promise ->fulfill (kj::mv (p.filled ));
446+ auto p = KJ_ASSERT_NONNULL (pendingReads.pop ());
447+ if (p.promise ->isWaiting ()) {
448+ p.promise ->fulfill (kj::mv (p.filled ));
449+ }
450450 continue ;
451451 }
452452
@@ -461,14 +461,13 @@ class CompressionStreamImpl: public kj::Refcounted,
461461 // far, output.empty() must be true. Let's check.
462462 KJ_ASSERT (output.empty ());
463463 // We need to flush any remaining reads.
464- while (!pendingReads.empty ()) {
465- auto pending = kj::mv (pendingReads.front ());
466- pendingReads.pop_front ();
467- if (pending.promise ->isWaiting ()) {
464+ pendingReads.drainTo ([&](PendingRead&& p) {
465+ if (p.promise ->isWaiting ()) {
468466 // Fulfill the pending read promise only if it hasn't already been canceled.
469- pending .promise ->fulfill (kj::mv (pending .filled ));
467+ p .promise ->fulfill (kj::mv (p .filled ));
470468 }
471- }
469+ });
470+ KJ_DASSERT (pendingReads.empty ());
472471 }
473472
474473 return kj::READY_NOW;
@@ -482,9 +481,7 @@ class CompressionStreamImpl: public kj::Refcounted,
482481
483482 kj::Canceler canceler;
484483 LazyBuffer output;
485- // We use std::list to keep memory overhead low when there are many streams with no or few pending
486- // reads.
487- std::list<PendingRead> pendingReads;
484+ workerd::util::Queue<PendingRead> pendingReads;
488485};
489486} // namespace
490487
0 commit comments