Skip to content

Blocking Sink/Source operators#7331

Merged
Mytherin merged 44 commits intoduckdb:masterfrom
samansmink:resumable-pipelines-conflict
May 4, 2023
Merged

Blocking Sink/Source operators#7331
Mytherin merged 44 commits intoduckdb:masterfrom
samansmink:resumable-pipelines-conflict

Conversation

@samansmink
Copy link
Collaborator

@samansmink samansmink commented May 2, 2023

This PR reworks the Sink/Source API and Pipeline executor to allow interrupting pipeline excution from within a Sink or
Source operator. The idea behind this is that a Sink/Source may have some reason to not want to receive/emit any tuples
for a while (e.g. due to some i/o or a buffer that is full). Interrupting the pipeline allows other non-blocked pipelines to be executed. Rescheduling of a pipeline then happens by a callback made from some (generally) asynchronous code defined by the operator. This feature doesn't offer any concrete feature by itself yet, but it does pave the way for things like async I/O and parallel streaming query results. Note that in this PR we do not yet expose the interrupts to the table function API: currently this is only available in the Source/Sink API. Changing the Table Function API will break a lot of things, but will likely follow in a future PR after the 0.8.0 release.

API changes

New Sink interface

The sink interface has been altered to also pass an InterruptState object:

virtual SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput& input) const;

with

struct OperatorSinkInput {
	GlobalSinkState &global_state;
	LocalSinkState &local_state;
	InterruptState &interrupt_state;
};

The SinkResultType is updated to allow the BLOCKED type, signalling that execution is blocked and
the calling code should await the blocking method that specified in the InterruptState:

enum class SinkResultType : uint8_t { NEED_MORE_INPUT, FINISHED, BLOCKED };

New Source interface

virtual SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const;

where s is

struct OperatorSourceInput {
    GlobalSinkState &global_state;
    LocalSinkState &local_state;
    InterruptState &interrupt_state;
};

Note that GetData now returns a SourceResultType (previously void)

enum class SourceResultType : uint8_t { HAVE_MORE_OUTPUT, FINISHED, BLOCKED };

An additional advantage of the updated Source interface is that operators which will only return 1 chunk can
now simply return a chunk and immediately returning the FINISHED state instead of requiring holding custom state
and returning an empty chunk on the second call. Operators like these (e.g. PhysicalCopyToFile) have been simplified.
Note that returning SourceResultType::FINISHED will now guarantee the GetData method is not called again to allow
operators to safely not hold state on this.

How to use the Interrupt API

The basics of interrupting a pipeline from a Sink/Source operator has two parts to it:

  • The operator should return SinkResultType::BLOCKED/SourceResultType::BLOCKED.
  • The operator should guarantee that some other thread calls InterruptState::Calback() on (a copy of) the InterruptState object.

For the Sink specifically, it should be mentioned that the semantics of returning Blocked are: "I can not Sink this chunk right now, call me again with this chunk after receiving my callback". Here it's important to realize that the operator will receive the same chunk again. If the operator did already sink the chunk when returning BLOCKED for some reason, it should manage this state manually. For the Source the semantics of returning blocked are: "I do not have a chunk right now, you can call me again after receiving my callback."

Note that there's a few gotcha's here. Firstly, when an operator returns BLOCKED, it guarantees to DuckDB that the callback will be called, not doing so will block DuckDB indefinitely. Secondly, calling the callback without returning BLOCKED is also erroneous: the callback will also block indefinitely. Finally, Calling the Callback synchronously from the Sink/Source operator is also not allowed as the callback will blockingly wait for the task to be descheduled. This should however never happen: the operator can simply decide to not return BLOCKED and therefore doesn't need to call the callback at all.

Managing object lifetimes

When a blocking Source/Sink operator launches a thread that will handle some async task, the operator is responsible for ensuring the correct lifetime of any object passed to the async task. The important thing to realize here is that while the async thread is running, the query may have been interrupted or have failed in the background. This would then free all state of the operator. To solve this, the async code should have (shared) ownership to all objects it needs to access.

Error handling withing async

No error handling is currently provided, leaving this up to the blocking operator itself to implement.

Example

Here's a very basic example of how the interrupt API works, in this case in a source operator:

SourceResultType MyOperator::GetData(DataChunk &chunk, OperatorSourceInput &input) { 
    auto& interrupt_state = input.interrupt_state;
    
    // Start a thread to resume pipeline execution after a second. 
    // It is important that the interrupt state is copied, not referenced. 
    std::thread rewake_thread([interrupt_state] {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        
        // Make the callback to signal execution can be resumed
        interrupt_state.Callback();
    });
    rewake_thread.detach();
    
    return SourceResultType::BLOCKED;
}

Note that this source operator is a simplified example and would run forever. Every time this operator is called,
it will interrupt the pipeline for a second, then when the callback happens the pipeline is rescheduled. Then the operator
is called again, and blocks for a second, etc.

For Sinks behaviour is identical, except that a SinkResultType::BLOCKED is returned.

Testing

Testing of this feature currently happens through a special compile flag set from the makefile with FORCE_ASYNC_SINK_SOURCE. This flag will ensure every sink and source operator blocks for a short while the first time it's called. This allows testing async sink/source with all currently available tests. Its added to CI as a separate work

Note that when more code is added to DuckDB using this code, we may want to remove/trim these workflows as the main reason for
adding it now is to quickly get good test coverage for the resumable pipelines.

still some failing tests, basic callback api seems allright
flag to run all operators in async mode: currently default on to see what happens, should be separate ci job to run tests with this flag
Instead we now run an additional verification step with and without caching operators
@samansmink samansmink changed the title Blocking Sink Source operators Blocking Sink/Source operators May 2, 2023
@samansmink samansmink requested a review from Mytherin May 2, 2023 20:37
Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Looks great. Some minor comments but otherwise it looks good to me.

A minor note is that the CI takes longer as a result of this change (e.g. LinuxDebug changes from 133~ minutes to 179~ minutes). I assume that is because of the NoOperatorCachingVerifier. Perhaps we should enable that only for the debug async build?

@samansmink
Copy link
Collaborator Author

A minor note is that the CI takes longer as a result of this change (e.g. LinuxDebug changes from 133~ minutes to 179~ minutes). I assume that is because of the NoOperatorCachingVerifier.

I'm ok with disabling it completely or only enabling with when the FORCE_ASYNC_SINK_SOURCE is set. I found it useful for async tests, but it hasn't really found any other issues so it may not be worth the extra time

@Mytherin
Copy link
Collaborator

Mytherin commented May 3, 2023

A minor note is that the CI takes longer as a result of this change (e.g. LinuxDebug changes from 133~ minutes to 179~ minutes). I assume that is because of the NoOperatorCachingVerifier.

I'm ok with disabling it completely or only enabling with when the FORCE_ASYNC_SINK_SOURCE is set. I found it useful for async tests, but it hasn't really found any other issues so it may not be worth the extra time

Let's combine it with FORCE_ASYNC_SINK_SOURCE then

@samansmink
Copy link
Collaborator Author

Updated the PR description with some more details ☝️

@Mytherin Mytherin merged commit b41a8bb into duckdb:master May 4, 2023
@Mytherin
Copy link
Collaborator

Mytherin commented May 4, 2023

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants