-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13576: [C++] Replace ExecNode::InputReceived with ::MakeTask #11210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
57985a6 to
601a238
Compare
568a2ce to
c7fd851
Compare
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a reasonable start. I'm not sure I fully understand the task scheduler right now and it would be nice if we could avoid duplication between the nodes on some of the task management work.
|
I update this PR and I think is ready to review. I was struggling trying to work fine with So now I am using TaskGroup class in async mode with a |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some implementation-level comments. Thanks for finding the error in AsyncTaskGroup; we should figure out what's going on there as well.
|
Ah, for AsyncTaskGroup, I think what's happening is that it allows adding tasks even after WaitForTaskToFinish, but it's assumed that this will only be done by another task. But for this PR, presumably that's not the case - we're adding tasks externally so that invariant may be violated, and so running_tasks_ may hit 0, the future may be marked finished, and then a concurrent call may add another task. So the semantics of AsyncTaskGroup don't quite fit what we want here. |
|
FYI, in the new dataset writer node PR, I've renamed Check it out here: https://github.com/apache/arrow/pull/11017/files If you want to use this I can pull out the changes to AsyncTaskGroup and submit them as a small PR on their own today. Otherways, it will probably be some ways off before 11017 merges. |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of combining similar logic in one place but I don't think this implementation is not going to make sense for all exec nodes. Rather than write one huge comment I drafted up a bare sketch of an idea here: https://docs.google.com/document/d/1aJWvU-9fxnl8eE6Iw-IebVVs8wxkZ30f1rNpSBOBgmY/edit#heading=h.qv2yr7aeayxs
I didn't spend too much time on it so feel free to reject it out of the box. However, I think we will want to either create various ExecNode subclasses or create an "exec node runner" concept. This way we keep the base ExecNode API pure. This allows nodes that need to have a rather custom implementation to keep their custom implementation and nodes that can share common logic can do so.
3f38b78 to
8fd545f
Compare
|
I refactor this PR based of your feedbacks.
|
ae54b32 to
6c8ebde
Compare
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts in addition to David's suggestion of a base class. I see the challenge of the composition approach better now. Since the runner doesn't have visibility of the inputs/outputs it's difficult for the runner to take care of things like forwarding signals backwards and forwards.
bd471db to
2ff4339
Compare
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. I left some small comments but this looks good to me.
|
Thanks @lidavidm, I rebased this PR and addressed latest feedback. I think this PR is ready for merging. Let me know if anything needs to be taken care of from my end to merge this into PR. cc @felipeblazing, @westonpace |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM, I'll let Weston/Felipe comment though.
|
@ursabot please benchmark lang=R |
|
Thanks @weston, I rebased this PR and addressed latest feedback. Moreover I ran some benchmarks to see the impact of: 1. the possible issue with ExecBatch copies; 2. async mode execution. Note: I runned the benchmark with the following code and with this machine configuration: 1.1 Sync mode with the lambda task function capturing batches by copy 1.2 Sync mode with the lambda task function capturing batches by move with std::bind As you can see ExecBatch copies were not the culprit of worst performance in some queries because there aren't "that" many ExecBatch instances and the copy is cheap (most of the case just references).
As you can see performance when the workload is small is worst in async mode but when the workload is huge the performance tends to be better. Looking forward your thoughts. cc @felipeblazing |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! I left some comments on the benchmark.
|
Thanks @lidavidm, I update the benchmark code. Here the last numbers As you can see again performance in async mode when the workload is small is not good but when the workload is huge the performance tends to be better. |
|
This benchmark is very cool and we certainly need more benchmarks to understand the overhead of ExecPlan. I'm not sure it explains the conbench results however. The benchmark creates a Scan->Filter->Project plan and then tests between serial & threaded. However, the conbench result isn't a difference between has_executor=true and has_executor=false. The thing I most want to understand is the difference in performance between "thread per batch" and "thread per operation". In other words, with your change we now create 3 tasks for every batch in threaded mode. Before your change we created 1 task for every batch. Can you add a flag to your benchmark that only controls whether the filter & project steps create a new task or use the serial runner. There would be two modes: Thread Per Batch Mode:
Thread Per Operation Mode:
|
minor change test CI
minor fix
minor changes
- auto cancelled = impl_->Cancel();
- DCHECK(cancelled); // could generates an error
- impl_->MarkFinished(status);
+ if (impl_->Cancel()) {
+ impl_->MarkFinished(status);
+ }
16df46f to
a985186
Compare
|
Thanks @westonpace, I made some changed to enable , Thread Per Batch Mode (async_mode=false) & Thread Per Operation Mode (async_mode=true). As u mentioned before, this benchmark shows that spawning new thread tasks don't reduced our core usage too much (The percentage difference vary between 1% - 5%), though It don't increase core usage either.
|
|
Benchmark runs are scheduled for baseline = b2b2cbe and contender = 4f0ac50. 4f0ac50 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Instead of immediately pushing to outputs, ExecNodes should package their work into tasks and defer execution to allow more sophisticated external scheduling. This will also relieve pipeline breaking ExecNodes of the responsibility to ensure their work is correctly parallelized.