Skip to content

Conversation

@bkietz
Copy link
Member

@bkietz bkietz commented Jul 12, 2021

This is a pretty trivial node but it's needed for completeness and will give bindings a pipeline breaker to experiment with until #10660 merges

@github-actions
Copy link

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thoughts but looks good. It helps my understanding to see the aggregate kernels in action.

ExecBatch batch{{}, 1};
batch.values.resize(kernels_.size());

for (size_t i = 0; i < kernels_.size(); ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe someday in the future we could merge each kernel on its own thread but that can be for a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging scalar aggregates is pretty trivial so I'd guess we don't gain much with parallelization. Worth investigating in a follow up, though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, In my head "merge" meant something more like a merge sort. I agree, if it's just summing up a sum/mean/etc. counter across the various states then I agree it's not necessary.

return;
}

lock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock could probably be removed. We might want to make a note to measure this with micro benchmarks someday. Only one thread should be finishing anyways and the "what state blocks have we used" map could probably be a lock free structure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputReceived(last batch) might be called concurrently with InputFinished, so those two must synchronize to ensure only one does the finishing. It'd certainly be helpful to introduce less clumsy control flow in these classes

return Status::OK();
}

void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: to implement something like ARROW-12710 (string concat aggregate kernel) we'll need to know the order of inputs in the kernels (or will have to feed results into the kernel in order) - how do we plan to handle that? Passing down seq and having each kernel reorder inputs itself, or perhaps with an upstream ExecNode that orders its inputs? This also applies to the group by node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seq is not an indication of order, it's only a tag in the range [0, seq_stop) (where seq_stop is set by InputFinished) so we could not use it to order results.

As specified in ARROW-12710, the KernelState of the string concat agg kernel will need to include ordering criteria so that merge(move(state1), &state0) can be guaranteed equivalent to merge(move(state0), &state1). Furthermore, merge cannot actually concatenate anything because if we happened to first merge(move(state0), &state3) we'd have no way to insert state1, state2 in the middle later. Actual concatenation would have to wait for finalize.

Those ordering criteria could be synthesized from (for example) fragment/batch index information, but the presence of O(N) state in a scalar agg kernel's State is suspect to me and I'm not sure it's a great match for ScalarAggregateKernel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks, sorry for the misunderstanding (I need to stop thinking only about datasets).

I suppose it only makes sense to talk about 'order' when directly downstream from a scan or explicit sort, then. And any aggregates that have O(N) state might properly belong as their own ExecNode.


// finally, pipe the project node into a sink node
// NB: if we don't need ordering, we could use compute::MakeSinkNode instead
ASSERT_OK_AND_ASSIGN(auto sink_gen, dataset::MakeOrderedSinkNode(project, "sink"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example! I do like this setup a lot more since it is clearer what all the steps are in reading a dataset + everything is neatly separated. It is not that the current scanner does not separate up the various stages of its pipeline, but the pipeline in the scanner is rather tied together while this is clearly partitioned.

@bkietz bkietz closed this in 7114c4b Jul 13, 2021
@bkietz bkietz deleted the 13313-Add-ScalarAggregateNode branch July 13, 2021 20:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants