-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-1565: [C++] Implement TopK/BottomK #11019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to rebase to fix some of the builds.
I think you've mentioned benchmarks, are there results to show/is there benchmark code?
Should SelectK instead be an aggregate function?
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I hit Enter before I meant to. Continuing on…
712eb15 to
70d04ef
Compare
|
Thanks @lidavidm, @edponce for the feedback. I reduced the code, refactoring it where it was required and last unit tests were added.
Here are some benchmark (code is here) results for
Some conclusions: Both implementations ( This PR started as a way to speed top-k queries, https://lemire.me/blog/2017/06/21/top-speed-for-top-k-queries/ using In the next benchmarks I will show the peformance when the algorithm needs to be stable to enable --
I am not sure, I am new with arrow. And this function looks like pretty similar to the APIs of SortIndices and PartitionNthIndices So I think it is in the right place. Looking forward to your thoughts. cc @lidavidm |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates.
We may actually want this to be implemented multiple ways: as both a hash aggregate kernel and a vector kernel. We'll want the hash aggregate kernel to match Pandas, where nlargest is part of a group by operation. Of course, we can split out the implementation of that for later, but I think it's worth considering how we can share the implementations as much as possible. (It may not be that you can share much, since the hash aggregate kernel has to work in a streaming fashion which this does not do.)
We'll still want a vector kernel implementation (the one here) as essentially a pipeline sink, since this implements an ... ORDER BY ... LIMIT operation. (We'll need to implement an ExecNode for that - that can be a separate follow up JIRA, it should be straightforward.) Note if you do want to match the API of SortIndices this should return indices, not values.
Also, note there are some CI failures - mostly just different compilers looking for different things.
It might be good to compare the kernel here with SortIndices (since you could emulate a top-K by calling SortIndices, slicing the output, then calling Take) - I wonder how much of a speedup this achieves.
|
Thanks for the feedback @lidavidm
Update: I changed these APIs to return only indices like SortIndices.
Here the benchmarks results for
These result show that
This implementation for stable select_k with a stable heap, although it is valid, is still simple and maybe the best choise here is a sort-based algorithm. IMO, implementation of stable algorithms need more exploration, and it could be implemented in a separate follow up JIRA issue adding an optional parameter to choose between Besides these updates, instead of a single SortOrder for RecordBatch and Table a new kernel called Looking forward to your thoughts. cc @lidavidm |
ea5c316 to
1bfb58a
Compare
Interesting. Did you double-check that no mistake was made when running the benchmarks (i.e. are you really comparing the two different algorithms)?
Hmm, why not, but |
|
Thanks for the benchmarks. It looks like the stable heap is about the same or perhaps slightly slower than just sorting and slicing, which is unfortunate.
Since the unstable version is quite a bit faster so far here, maybe we should expose a |
|
Le 07/09/2021 à 15:46, David Li a écrit :
Hmm, why not, but sort_indices and partition_nth_indices are
guaranteed to be stable, so it would be better if topK/bottomK
wasn't inconsistent.
Since the unstable version is quite a bit faster so far here, maybe we
should expose a |select_k_unstable|?
If "quite a bit" means 2x or 3x rather than 20%, then that sounds good
to me indeed.
(Was the reason why we don't have
an unstable sort because it's not useful and/or wasn't meaningfully faster?)
I'm not sure we have actually tried to benchmark an unstable sort. But
being stable is usually a desirable guarantee when the sort columns
don't span all columns. Of course, that depends on the use case :-)
|
|
Another thought: isn't one attraction of TopK to have a streaming algorithm with O(k) memory consumption? Making a full sort requires to materialize the entire input, hence O(n) memory consumption. |
That is a fair point. However, this kernel as-is is not a streaming implementation and we would have to implement it as an actual aggregate kernel to get that behavior. |
I have a question related to that, the streaming implementation would be using ExecNodes right? |
My bad, You were right @pitrou the numbers are not correct. Something happenned when I ran this bench both at the same time. I ran again PartitionNthToIndices with different strategies (nth_element vs heap_based) separetly and I got these numbers:
Note that both algorithms uses IMO this PR should first expose a |
Oh, you're right! I had forgotten that
But if K is small, sorting them should be cheap, right? I'm not sure what the cutoff would be... (sqrt(N) perhaps?)
Agreed! |
It's unclear to me. The API for scalar aggregate functions expects you to handle parallelism and streaming, but doesn't handle larger-than-memory state. I would say it should just be a scalar aggregate function if we were to implement it right now, though, since we haven't really started thinking about larger-than-memory state. |
0cdb8f9 to
7371656
Compare
minor fix
addressing feedback comments
minor fix
new select_k kernel updated tests minor fix minor fix
minor fix
972a482 to
83aafbb
Compare
|
Thanks again for the comments @lidavidm. Besides the minor latest changes addressing the feedback I filed follow-up JIRA issues: [C++][Compute] Improve top_k/bottom_k Selecters via CRTP [C++][Compute]Implement SelectKStable [C++][Compute]Implement streaming version for SelectK The first one is an improvement of the selecter and template specialization for specific type. The last two are tasks to implement a streaming version of it and SelectKStable. Apart from that, I think it is ready for final review / merging, let me know what you think. |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing everything. I think we can merge, I'll also filed a followup to add a SelectKSinkNode: ARROW-13973
|
Thanks @aocsa we can tackle some of these in followups. |
|
Sorry Eduardo, I had thought that comment was your review. Thanks for filling ARROW-13974, we can fix up things there. |
Heap-based topk can compute these indices in O(n log k) time Closes apache#11019 from aocsa/aocsa/ARROW-1565 Authored-by: Alexander <aocsa.cs@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
Heap-based topk can compute these indices in O(n log k) time