-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13540: [C++] Add order by sink node #10863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Draft for now since it'll require rebasing on top of #10793/ARROW-13482 |
|
Now reworked to tag batches instead of being a sink node. Next, I can try to implement a simple order-dependent aggregate kernel and see how far that takes us. |
|
Added a hash_arg_min_max kernel that makes use of the OrderByNode's tag. See ARROW-12873. |
|
Rebased on top of the registry PR. |
|
TODO: revert back to making OrderBy a SinkNode and split arg_min_max and related things into a separate JIRA |
|
Converted back to a sink node. |
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments, overall this looks solid
cpp/src/arrow/compute/exec/options.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment needs to be refactored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: using this above please
| Finishes(ResultWith(::testing::ElementsAreArray(expected)))); | |
| Finishes(ResultWith(ElementsAreArray(expected)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be worthwhile to extract TableFromExecBatches into exec/util.h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this would be more readable as
| auto maybe_sorted = SortData(); | |
| Status st = DoFinish(); | |
| if (ErrorIfNotOk(st)) { | |
| producer_.Push(std::move(st)); | |
| } | |
| SinkNode::Finish(); | |
| } | |
| Status DoFinish() { | |
| ARROW_ASSIGN_OR_RAISE(auto sorted, SortData()); | |
| //... |
| .AddToPlan(plan.get())); | ||
|
|
||
| ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), | ||
| Finishes(ResultWith(::testing::ElementsAreArray({ExecBatchFromJSON( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Finishes(ResultWith(::testing::ElementsAreArray({ExecBatchFromJSON( | |
| Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON( |
| #include "arrow/compute/exec/exec_plan.h" | ||
|
|
||
| #include <mutex> | ||
| #include <unordered_map> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #include <unordered_map> |
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Adds a sink node that accumulates, sorts, and emits sorted batches. This is a sink node as currently we don't have a good design for handling order-dependent operations in the rest of the pipeline.