-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13313: [C++][Compute] Add scalar aggregate node #10705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few thoughts but looks good. It helps my understanding to see the aggregate kernels in action.
| ExecBatch batch{{}, 1}; | ||
| batch.values.resize(kernels_.size()); | ||
|
|
||
| for (size_t i = 0; i < kernels_.size(); ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe someday in the future we could merge each kernel on its own thread but that can be for a future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging scalar aggregates is pretty trivial so I'd guess we don't gain much with parallelization. Worth investigating in a follow up, though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, In my head "merge" meant something more like a merge sort. I agree, if it's just summing up a sum/mean/etc. counter across the various states then I agree it's not necessary.
| return; | ||
| } | ||
|
|
||
| lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock could probably be removed. We might want to make a note to measure this with micro benchmarks someday. Only one thread should be finishing anyways and the "what state blocks have we used" map could probably be a lock free structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InputReceived(last batch) might be called concurrently with InputFinished, so those two must synchronize to ensure only one does the finishing. It'd certainly be helpful to introduce less clumsy control flow in these classes
| return Status::OK(); | ||
| } | ||
|
|
||
| void InputReceived(ExecNode* input, int seq, ExecBatch batch) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: to implement something like ARROW-12710 (string concat aggregate kernel) we'll need to know the order of inputs in the kernels (or will have to feed results into the kernel in order) - how do we plan to handle that? Passing down seq and having each kernel reorder inputs itself, or perhaps with an upstream ExecNode that orders its inputs? This also applies to the group by node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seq is not an indication of order, it's only a tag in the range [0, seq_stop) (where seq_stop is set by InputFinished) so we could not use it to order results.
As specified in ARROW-12710, the KernelState of the string concat agg kernel will need to include ordering criteria so that merge(move(state1), &state0) can be guaranteed equivalent to merge(move(state0), &state1). Furthermore, merge cannot actually concatenate anything because if we happened to first merge(move(state0), &state3) we'd have no way to insert state1, state2 in the middle later. Actual concatenation would have to wait for finalize.
Those ordering criteria could be synthesized from (for example) fragment/batch index information, but the presence of O(N) state in a scalar agg kernel's State is suspect to me and I'm not sure it's a great match for ScalarAggregateKernel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks, sorry for the misunderstanding (I need to stop thinking only about datasets).
I suppose it only makes sense to talk about 'order' when directly downstream from a scan or explicit sort, then. And any aggregates that have O(N) state might properly belong as their own ExecNode.
|
|
||
| // finally, pipe the project node into a sink node | ||
| // NB: if we don't need ordering, we could use compute::MakeSinkNode instead | ||
| ASSERT_OK_AND_ASSIGN(auto sink_gen, dataset::MakeOrderedSinkNode(project, "sink")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the example! I do like this setup a lot more since it is clearer what all the steps are in reading a dataset + everything is neatly separated. It is not that the current scanner does not separate up the various stages of its pipeline, but the pipeline in the scanner is rather tied together while this is clearly partitioned.
This is a pretty trivial node but it's needed for completeness and will give bindings a pipeline breaker to experiment with until #10660 merges