Cancel pending in/outbound consumers in EmbeddedChannelCore upon channel close#3464
Merged
Lukasa merged 4 commits intoapple:mainfrom Jan 6, 2026
Merged
Conversation
…on channel close.
### Motivation:
Currently, pending consumer closures remain in the `{in}{out}boundBufferConsumer` queues of `EmbeddedChannelCore` even after the channel closes. Also, it is possible to enqueue consumers *after* the channel closes. In these cases, the consumer closures will never be invoked and this can lead to unfavourable behaviour, as observed in `NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` methods (the only place these queues are currently used).
`NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` methods complete a continuation *inside* the consumer closure. In the cases described above, the continuation never completes and therefore `waitFor{In}{Out}boundWrite` never returns.
### Modifications:
- Updated the element type in `EmbeddedChannelCore`'s `{in}{out}boundBufferConsumer` from `(NIOAny) -> Void` to `(Result<NIOAny, Error>) -> Void`.
- This is so that the `.failure` case can be used to notify the consumer closure that the channel has closed.
- Changed the visibility of the `{in}{out}boundBufferConsumer` properties from `internal` to `private` in order to prevent the queues from being accessed and being appended to without the call-site considering whether the channel has been closed.
- Added new methods named `enqueue{In}{Out}boundBufferConsumer(_:)` which take the consumer closure as an argument and only append to the corresponding queue if the channel isn't closed. If the channel is closed, the consumer closure is invoked immediately with `.failure(ChannelError.ioOnClosedChannel)`.
- Updated `EmbeddedChannelCore`'s `close0` method to empty out all pending closures in `{in}{out}boundBufferConsumer` and return a `.failure(ChannelError.ioOnClosedChannel)` result to each closure.
- Updated `NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` to throw an error in the continuation upon receiving a `.failure` result.
- Added associated test cases.
### Result:
`EmbeddedChannelCore`'s `{in}{out}boundBufferConsumer` queues can be used more safely and all pending closures are invoked upon channel close. As a result, `NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` no longer indefinitely blocks when the channel closes.
Lukasa
reviewed
Jan 6, 2026
Sources/NIOEmbedded/Embedded.swift
Outdated
| /// Enqueue a consumer closure that will be invoked upon the next pending inbound write. | ||
| /// - Parameter newElement: The consumer closure to enqueue. Returns a `.failure` result if the channel has already | ||
| /// closed. | ||
| func enqueueInboundBufferConsumer(_ newElement: @escaping (Result<NIOAny, Error>) -> Void) { |
Contributor
There was a problem hiding this comment.
Nit: we typically annotate methods only supposed to be called on the EL with a leading underscore, and I recommend making this private if it can be.
Contributor
Author
There was a problem hiding this comment.
I've added a leading underscore to both methods (enqueue{In}{Out}boundBufferConsumer(_:)).
Those methods are defined in EmbeddedChannelCore and are also called from NIOAsyncTestingChannel, so unfortunately, they cannot be made private.
Lukasa
approved these changes
Jan 6, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation:
Currently, pending consumer closures remain in the
{in}{out}boundBufferConsumerqueues ofEmbeddedChannelCoreeven after the channel closes. It is also possible to enqueue consumers after the channel closes. In these cases, the consumer closures will never be invoked and this can lead to unfavourable behaviour, as observed inNIOAsyncTestingChannel'swaitFor{In}{Out}boundWritemethods (the only places these queues are currently used).NIOAsyncTestingChannel'swaitFor{In}{Out}boundWritemethods complete a continuation inside the consumer closure. In the cases described above, the continuation never completes and thereforewaitFor{In}{Out}boundWritenever returns.Modifications:
EmbeddedChannelCore's{in}{out}boundBufferConsumerfrom(NIOAny) -> Voidto(Result<NIOAny, Error>) -> Void..failurecase can be used to notify the consumer closure that the channel has closed.{in}{out}boundBufferConsumerproperties frominternaltoprivatein order to prevent the queues from being accessed and being appended to without the call site considering whether the channel has been closed.internalmethods namedenqueue{In}{Out}boundBufferConsumer(_:)which take the consumer closure as an argument and only append to the corresponding queue if the channel isn't closed..failure(ChannelError.ioOnClosedChannel).EmbeddedChannelCore'sclose0method to return a.failure(ChannelError.ioOnClosedChannel)result to each closure in{in}{out}boundBufferConsumerand empty both buffers.NIOAsyncTestingChannel'swaitFor{In}{Out}boundWriteto throw an error in the continuation upon receiving a.failureresult.Result:
EmbeddedChannelCore's{in}{out}boundBufferConsumerqueues can be used more safely: all pending closures will be invoked upon channel close. As a result,NIOAsyncTestingChannel'swaitFor{In}{Out}boundWriteno longer indefinitely blocks when the channel closes.