Blocking Sink/Source operators#7331
Conversation
still some failing tests, basic callback api seems allright
flag to run all operators in async mode: currently default on to see what happens, should be separate ci job to run tests with this flag
…tal power than i currently have
Instead we now run an additional verification step with and without caching operators
…state is generated
There was a problem hiding this comment.
Thanks for the PR! Looks great. Some minor comments but otherwise it looks good to me.
A minor note is that the CI takes longer as a result of this change (e.g. LinuxDebug changes from 133~ minutes to 179~ minutes). I assume that is because of the NoOperatorCachingVerifier. Perhaps we should enable that only for the debug async build?
I'm ok with disabling it completely or only enabling with when the |
Let's combine it with |
|
Updated the PR description with some more details ☝️ |
|
Thanks! |
This PR reworks the Sink/Source API and Pipeline executor to allow interrupting pipeline excution from within a Sink or
Source operator. The idea behind this is that a Sink/Source may have some reason to not want to receive/emit any tuples
for a while (e.g. due to some i/o or a buffer that is full). Interrupting the pipeline allows other non-blocked pipelines to be executed. Rescheduling of a pipeline then happens by a callback made from some (generally) asynchronous code defined by the operator. This feature doesn't offer any concrete feature by itself yet, but it does pave the way for things like async I/O and parallel streaming query results. Note that in this PR we do not yet expose the interrupts to the table function API: currently this is only available in the Source/Sink API. Changing the Table Function API will break a lot of things, but will likely follow in a future PR after the 0.8.0 release.
API changes
New Sink interface
The sink interface has been altered to also pass an InterruptState object:
with
The
SinkResultTypeis updated to allow theBLOCKEDtype, signalling that execution is blocked andthe calling code should await the blocking method that specified in the InterruptState:
New Source interface
where
sisNote that GetData now returns a
SourceResultType(previouslyvoid)An additional advantage of the updated Source interface is that operators which will only return 1 chunk can
now simply return a chunk and immediately returning the
FINISHEDstate instead of requiring holding custom stateand returning an empty chunk on the second call. Operators like these (e.g.
PhysicalCopyToFile) have been simplified.Note that returning
SourceResultType::FINISHEDwill now guarantee the GetData method is not called again to allowoperators to safely not hold state on this.
How to use the Interrupt API
The basics of interrupting a pipeline from a Sink/Source operator has two parts to it:
SinkResultType::BLOCKED/SourceResultType::BLOCKED.InterruptState::Calback()on (a copy of) the InterruptState object.For the Sink specifically, it should be mentioned that the semantics of returning Blocked are: "I can not Sink this chunk right now, call me again with this chunk after receiving my callback". Here it's important to realize that the operator will receive the same chunk again. If the operator did already sink the chunk when returning BLOCKED for some reason, it should manage this state manually. For the Source the semantics of returning blocked are: "I do not have a chunk right now, you can call me again after receiving my callback."
Note that there's a few gotcha's here. Firstly, when an operator returns
BLOCKED, it guarantees to DuckDB that the callback will be called, not doing so will block DuckDB indefinitely. Secondly, calling the callback without returningBLOCKEDis also erroneous: the callback will also block indefinitely. Finally, Calling the Callback synchronously from the Sink/Source operator is also not allowed as the callback will blockingly wait for the task to be descheduled. This should however never happen: the operator can simply decide to not returnBLOCKEDand therefore doesn't need to call the callback at all.Managing object lifetimes
When a blocking Source/Sink operator launches a thread that will handle some async task, the operator is responsible for ensuring the correct lifetime of any object passed to the async task. The important thing to realize here is that while the async thread is running, the query may have been interrupted or have failed in the background. This would then free all state of the operator. To solve this, the async code should have (shared) ownership to all objects it needs to access.
Error handling withing async
No error handling is currently provided, leaving this up to the blocking operator itself to implement.
Example
Here's a very basic example of how the interrupt API works, in this case in a source operator:
Note that this source operator is a simplified example and would run forever. Every time this operator is called,
it will interrupt the pipeline for a second, then when the callback happens the pipeline is rescheduled. Then the operator
is called again, and blocks for a second, etc.
For Sinks behaviour is identical, except that a
SinkResultType::BLOCKEDis returned.Testing
Testing of this feature currently happens through a special compile flag set from the makefile with
FORCE_ASYNC_SINK_SOURCE. This flag will ensure every sink and source operator blocks for a short while the first time it's called. This allows testing async sink/source with all currently available tests. Its added to CI as a separate workNote that when more code is added to DuckDB using this code, we may want to remove/trim these workflows as the main reason for
adding it now is to quickly get good test coverage for the resumable pipelines.