-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13611: [C++] Scanning datasets does not enforce back pressure #11285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-13611: [C++] Scanning datasets does not enforce back pressure #11285
Conversation
|
|
0a2e8b8 to
dd0b36e
Compare
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, this looks good to me, but it seems the tests are unreliable, especially on Windows - the backpressure doesn't seem to get applied quickly enough. Maybe we should deliver the fragments manually as in the sink node test.
This will also conflict with the MakeTask PR
cpp/src/arrow/util/thread_pool.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with the scanner option, should we document that this is primarily for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will document that. ThreadPool is internal (and I'm not proposing we add WaitForIdle to Executor) so I hope that is reasonable. CC @pitrou for thoughts. For context: I added WaitForIdle to the ThreadPool purely for the sake of sequencing some testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I would expect dropping the generator/scanner to clean up everything. Wonder what's keeping it alive. (Not a problem to solve here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minimal example of the problem is:
TEST(Weston, MemTest) {
PushGenerator<util::optional<int>> int_prod;
AsyncGenerator<util::optional<int>> int_gen = int_prod;
Future<> visit_fut = VisitAsyncGenerator(std::move(int_gen), [] (util::optional<int>) {
return Status::OK();
});
}
The above will leak. I have a fix in mind but will address it in a separate PR.
668edf1 to
b36ea72
Compare
…tly it only works for unordered scans. This commit adds backpressure in a way that bypasses the exec plan entirely and these utilities will eventually change as the exec plan evolves.
…here opening the backpressure while holding a lock. Added a unit test for the sink node
…or simplicity and making it more obvious what is happening. Added backpressure to the OrderBy sink node
…ckpressure tests more predictable. In addition, my previous attempt at fixing the race condition between the producer and the async toggle wasn't correct. I improved it and now it should be more accurate.
…at. Removed disabled ordered backpressure test because I was just updating the same thing twice. It can be added back in when ordering is supported
…tor as the source of an AsyncGenerator pipeline can be somewhat problematic. if the producer is not finished.
1f05f6c to
401e906
Compare
|
Benchmark runs are scheduled for baseline = 5f80ddc and contender = 8837f64. 8837f64 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
This PR adds backpressure back into the asynchronous scanner. It creates an AsyncToggle which can be shared between the push-based sink and the pull-based scanner. The sink will close the toggle when it's buffer fills up and the scanner will pause delivering items when the toggle is closed.
This PR adds the feature in a way that bypasses the exec plan's backpressure mechanisms as those have not been fully fleshed out and I still am not sure what direction we are planning to go with that. Instead the back pressure is almost completely handled outside of the compute space.
I've got the same mechanism working for dataset writes but I don't want to hold up this PR while I wait for the write node to merge so I have created ARROW-14191 to follow that work.
Currently backpressure is broken for ordered scans. It turns out this has always been the case for the asynchronous scanner, even before it moved to the exec plan. The root cause is that the merge generator will keep reading from files 2-N if the read on file 1 is slow. I have created a test case which demonstrates this but will defer fixing this for ARROW-14192