Skip to content

Conversation

@aocsa
Copy link
Contributor

@aocsa aocsa commented Aug 27, 2021

Heap-based topk can compute these indices in O(n log k) time

@github-actions
Copy link

@lidavidm lidavidm self-requested a review August 31, 2021 19:14
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to rebase to fix some of the builds.

I think you've mentioned benchmarks, are there results to show/is there benchmark code?

Should SelectK instead be an aggregate function?

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I hit Enter before I meant to. Continuing on…

@aocsa aocsa marked this pull request as ready for review September 2, 2021 15:33
@aocsa
Copy link
Contributor Author

aocsa commented Sep 2, 2021

Thanks @lidavidm, @edponce for the feedback. I reduced the code, refactoring it where it was required and last unit tests were added.

You may want to rebase to fix some of the builds.

I think you've mentioned benchmarks, are there results to show/is there benchmark code?

Here are some benchmark (code is here) results for partition_nth:intro_select and partition_nth:heap, values are int64 scalars with length from 1 << 20 to 1 << 23 and with 100 of null_proportion , k is 0,125*length:

partition_nth:intro_select

NthToIndicesInt64/SelectTop1024/32768/10000/min_time:1.000        336418 ns       336337 ns         4134 bytes_per_second=92.9128M/s items_per_second=12.1783M/s null_percent=0.01 size=32.768k
NthToIndicesInt64/SelectTop1024/32768/100/min_time:1.000          431855 ns       431747 ns         3243 bytes_per_second=72.3804M/s items_per_second=9.48705M/s null_percent=1 size=32.768k
NthToIndicesInt64/SelectTop1024/32768/10/min_time:1.000           413660 ns       413553 ns         3380 bytes_per_second=75.5646M/s items_per_second=9.9044M/s null_percent=10 size=32.768k
NthToIndicesInt64/SelectTop1024/32768/2/min_time:1.000            236539 ns       236480 ns         5940 bytes_per_second=132.147M/s items_per_second=17.3207M/s null_percent=50 size=32.768k
NthToIndicesInt64/SelectTop1024/32768/1/min_time:1.000             77369 ns        77349 ns        18102 bytes_per_second=404.011M/s items_per_second=52.9545M/s null_percent=100 size=32.768k
NthToIndicesInt64/SelectTop1024/32768/0/min_time:1.000            336092 ns       336009 ns         4160 bytes_per_second=93.0033M/s items_per_second=12.1901M/s null_percent=0 size=32.768k
NthToIndicesInt64/SelectTop1024/1048576/100/min_time:1.000       7809018 ns      7807367 ns          179 bytes_per_second=128.084M/s items_per_second=16.7882M/s null_percent=1 size=1048.58k
NthToIndicesInt64/SelectTop1024/8388608/100/min_time:1.000      61132755 ns     61121366 ns           23 bytes_per_second=130.887M/s items_per_second=17.1556M/s null_percent=1 size=8.38861M

NthToIndicesInt64/SelectTop64/32768/10000/min_time:1.000          244987 ns       244925 ns         5716 bytes_per_second=127.59M/s items_per_second=16.7235M/s null_percent=0.01 size=32.768k
NthToIndicesInt64/SelectTop64/32768/100/min_time:1.000            367908 ns       367813 ns         3807 bytes_per_second=84.9616M/s items_per_second=11.1361M/s null_percent=1 size=32.768k
NthToIndicesInt64/SelectTop64/32768/10/min_time:1.000             324431 ns       324349 ns         4354 bytes_per_second=96.3469M/s items_per_second=12.6284M/s null_percent=10 size=32.768k
NthToIndicesInt64/SelectTop64/32768/2/min_time:1.000              220053 ns       219995 ns         6377 bytes_per_second=142.049M/s items_per_second=18.6186M/s null_percent=50 size=32.768k
NthToIndicesInt64/SelectTop64/32768/1/min_time:1.000               80327 ns        80304 ns        17677 bytes_per_second=389.148M/s items_per_second=51.0064M/s null_percent=100 size=32.768k
NthToIndicesInt64/SelectTop64/32768/0/min_time:1.000              247086 ns       246991 ns         5561 bytes_per_second=126.523M/s items_per_second=16.5836M/s null_percent=0 size=32.768k
NthToIndicesInt64/SelectTop64/1048576/100/min_time:1.000         7572595 ns      7570022 ns          185 bytes_per_second=132.1M/s items_per_second=17.3146M/s null_percent=1 size=1048.58k
NthToIndicesInt64/SelectTop64/8388608/100/min_time:1.000        60670175 ns     60652316 ns           23 bytes_per_second=131.899M/s items_per_second=17.2883M/s null_percent=1 size=8.38861M

partition_nth:heap



NthToIndicesInt64/HeapBasedTop1024/32768/10000/min_time:1.000     333859 ns       333737 ns         4180 bytes_per_second=93.6365M/s items_per_second=12.2731M/s null_percent=0.01 size=32.768k
NthToIndicesInt64/HeapBasedTop1024/32768/100/min_time:1.000       433951 ns       433841 ns         3261 bytes_per_second=72.031M/s items_per_second=9.44125M/s null_percent=1 size=32.768k
NthToIndicesInt64/HeapBasedTop1024/32768/10/min_time:1.000        414340 ns       414237 ns         3390 bytes_per_second=75.4399M/s items_per_second=9.88806M/s null_percent=10 size=32.768k
NthToIndicesInt64/HeapBasedTop1024/32768/2/min_time:1.000         238940 ns       238870 ns         5861 bytes_per_second=130.824M/s items_per_second=17.1474M/s null_percent=50 size=32.768k
NthToIndicesInt64/HeapBasedTop1024/32768/1/min_time:1.000          79213 ns        79190 ns        17821 bytes_per_second=394.622M/s items_per_second=51.7238M/s null_percent=100 size=32.768k
NthToIndicesInt64/HeapBasedTop1024/32768/0/min_time:1.000         337863 ns       337777 ns         4125 bytes_per_second=92.5167M/s items_per_second=12.1263M/s null_percent=0 size=32.768k
NthToIndicesInt64/HeapBasedTop1024/1048576/100/min_time:1.000    7984756 ns      7982600 ns          177 bytes_per_second=125.272M/s items_per_second=16.4197M/s null_percent=1 size=1048.58k
NthToIndicesInt64/HeapBasedTop1024/8388608/100/min_time:1.000   61850326 ns     61825764 ns           22 bytes_per_second=129.396M/s items_per_second=16.9602M/s null_percent=1 size=8.38861M

NthToIndicesInt64/HeapBasedTop64/32768/10000/min_time:1.000       246324 ns       246264 ns         5648 bytes_per_second=126.896M/s items_per_second=16.6326M/s null_percent=0.01 size=32.768k
NthToIndicesInt64/HeapBasedTop64/32768/100/min_time:1.000         369914 ns       369818 ns         3778 bytes_per_second=84.5011M/s items_per_second=11.0757M/s null_percent=1 size=32.768k
NthToIndicesInt64/HeapBasedTop64/32768/10/min_time:1.000          323143 ns       323059 ns         4286 bytes_per_second=96.7316M/s items_per_second=12.6788M/s null_percent=10 size=32.768k
NthToIndicesInt64/HeapBasedTop64/32768/2/min_time:1.000           229702 ns       229619 ns         6399 bytes_per_second=136.095M/s items_per_second=17.8383M/s null_percent=50 size=32.768k
NthToIndicesInt64/HeapBasedTop64/32768/1/min_time:1.000            80524 ns        80503 ns        17146 bytes_per_second=388.186M/s items_per_second=50.8804M/s null_percent=100 size=32.768k
NthToIndicesInt64/HeapBasedTop64/32768/0/min_time:1.000           248449 ns       248391 ns         5579 bytes_per_second=125.81M/s items_per_second=16.4902M/s null_percent=0 size=32.768k
NthToIndicesInt64/HeapBasedTop64/1048576/100/min_time:1.000      8001923 ns      7998633 ns          181 bytes_per_second=125.021M/s items_per_second=16.3868M/s null_percent=1 size=1048.58k
NthToIndicesInt64/HeapBasedTop64/8388608/100/min_time:1.000     61181748 ns     61162773 ns           23 bytes_per_second=130.799M/s items_per_second=17.144M/s null_percent=1 size=8.38861M

Some conclusions: Both implementations (partition_nth:intro_select and partition_nth:heap) show similar numbers. PartitionNthIndices uses std::nth_element which uses internally a special algorithm called introselect, this is a is a selection algorithm that is a hybrid of quickselect and heap selection.

This PR started as a way to speed top-k queries, https://lemire.me/blog/2017/06/21/top-speed-for-top-k-queries/ using FastPriorityQueue-KWillets-replaceTop approach. However in the journey it was found that C++ implementation for std::nth_element has the best of quickselect and heap selection as an hybrid solution.

In the next benchmarks I will show the peformance when the algorithm needs to be stable to enable keep=first and keep=last. I will explore ways to implement a stable binary heap to do that and compare with {std::nth_element + std::stable_sort}.

--

Should SelectK instead be an aggregate function?

I am not sure, I am new with arrow. And this function looks like pretty similar to the APIs of SortIndices and PartitionNthIndices So I think it is in the right place.

Looking forward to your thoughts. cc @lidavidm

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates.

We may actually want this to be implemented multiple ways: as both a hash aggregate kernel and a vector kernel. We'll want the hash aggregate kernel to match Pandas, where nlargest is part of a group by operation. Of course, we can split out the implementation of that for later, but I think it's worth considering how we can share the implementations as much as possible. (It may not be that you can share much, since the hash aggregate kernel has to work in a streaming fashion which this does not do.)

We'll still want a vector kernel implementation (the one here) as essentially a pipeline sink, since this implements an ... ORDER BY ... LIMIT operation. (We'll need to implement an ExecNode for that - that can be a separate follow up JIRA, it should be straightforward.) Note if you do want to match the API of SortIndices this should return indices, not values.

Also, note there are some CI failures - mostly just different compilers looking for different things.

It might be good to compare the kernel here with SortIndices (since you could emulate a top-K by calling SortIndices, slicing the output, then calling Take) - I wonder how much of a speedup this achieves.

@aocsa
Copy link
Contributor Author

aocsa commented Sep 7, 2021

Thanks for the feedback @lidavidm

Thanks for the updates.

We may actually want this to be implemented multiple ways: as both a hash aggregate kernel and a vector kernel. We'll want the hash aggregate kernel to match Pandas, where nlargest is part of a group by operation. Of course, we can split out the implementation of that for later, but I think it's worth considering how we can share the implementations as much as possible. (It may not be that you can share much, since the hash aggregate kernel has to work in a streaming fashion which this does not do.)

We'll still want a vector kernel implementation (the one here) as essentially a pipeline sink, since this implements an ... ORDER BY ... LIMIT operation. (We'll need to implement an ExecNode for that - that can be a separate follow up JIRA, it should be straightforward.) Note if you do want to match the API of SortIndices this should return indices, not values.

Update: I changed these APIs to return only indices like SortIndices.

Also, note there are some CI failures - mostly just different compilers looking for different things.

It might be good to compare the kernel here with SortIndices (since you could emulate a top-K by calling SortIndices, slicing the output, then calling Take) - I wonder how much of a speedup this achieves.

Here the benchmarks results for topk with sortingIndices and topk with NonStableHeap, values are int64 scalars with length from 1 << 20 to 1 << 23 and with 100 of null_proportion , k is 1/8*N:

TopK_SortingIndices

-----------------------------------------------------------------------------------------------
Benchmark                                     Time             CPU   Iterations UserCounters...
-----------------------------------------------------------------------------------------------
TopKInt64/32768/10000/min_time:1.000     199633 ns       199601 ns         7010 bytes_per_second=156.563M/s items_per_second=20.521M/s null_percent=0.01 size=32.768k
TopKInt64/32768/100/min_time:1.000       205520 ns       205486 ns         6734 bytes_per_second=152.078M/s items_per_second=19.9332M/s null_percent=1 size=32.768k
TopKInt64/32768/10/min_time:1.000        196022 ns       195979 ns         7124 bytes_per_second=159.456M/s items_per_second=20.9002M/s null_percent=10 size=32.768k
TopKInt64/32768/2/min_time:1.000         124550 ns       124531 ns        11204 bytes_per_second=250.941M/s items_per_second=32.8914M/s null_percent=50 size=32.768k
TopKInt64/32768/1/min_time:1.000           9395 ns         9394 ns       149397 bytes_per_second=3.24871G/s items_per_second=436.034M/s null_percent=100 size=32.768k
TopKInt64/32768/0/min_time:1.000         199116 ns       199086 ns         7016 bytes_per_second=156.968M/s items_per_second=20.5741M/s null_percent=0 size=32.768k
TopKInt64/1048576/100/min_time:1.000   10200499 ns     10199057 ns          137 bytes_per_second=98.0483M/s items_per_second=12.8514M/s null_percent=1 size=1048.58k
TopKInt64/8388608/100/min_time:1.000  105033049 ns    105007534 ns           13 bytes_per_second=76.185M/s items_per_second=9.98572M/s null_percent=1 size=8.38861M

TopK_NonStableHeap

-----------------------------------------------------------------------------------------------
Benchmark                                     Time             CPU   Iterations UserCounters...
-----------------------------------------------------------------------------------------------
TopKInt64/32768/10000/min_time:1.000      74202 ns        74191 ns        18553 bytes_per_second=421.208M/s items_per_second=55.2085M/s null_percent=0.01 size=32.768k
TopKInt64/32768/100/min_time:1.000        78732 ns        78719 ns        17032 bytes_per_second=396.982M/s items_per_second=52.0333M/s null_percent=1 size=32.768k
TopKInt64/32768/10/min_time:1.000         79833 ns        79821 ns        17285 bytes_per_second=391.5M/s items_per_second=51.3147M/s null_percent=10 size=32.768k
TopKInt64/32768/2/min_time:1.000          71248 ns        71237 ns        19374 bytes_per_second=438.674M/s items_per_second=57.4979M/s null_percent=50 size=32.768k
TopKInt64/32768/1/min_time:1.000           4680 ns         4679 ns       301875 bytes_per_second=6.52247G/s items_per_second=875.431M/s null_percent=100 size=32.768k
TopKInt64/32768/0/min_time:1.000          73972 ns        73962 ns        18658 bytes_per_second=422.516M/s items_per_second=55.38M/s null_percent=0 size=32.768k
TopKInt64/1048576/100/min_time:1.000    5714238 ns      5713428 ns          247 bytes_per_second=175.026M/s items_per_second=22.941M/s null_percent=1 size=1048.58k
TopKInt64/8388608/100/min_time:1.000   67560407 ns     67546834 ns           20 bytes_per_second=118.436M/s items_per_second=15.5237M/s null_percent=1 size=8.38861M

These result show that TopK_NonStableHeap is quite faster (around 2.7x). However the implementation of a stable version of it using a StableHeap is not as good as TopK_NonStableHeap, as you can see below:

TopK_StableHeap

-----------------------------------------------------------------------------------------------
Benchmark                                     Time             CPU   Iterations UserCounters...
-----------------------------------------------------------------------------------------------
TopKInt64/32768/10000/min_time:1.000     213637 ns       213609 ns         6596 bytes_per_second=146.295M/s items_per_second=19.1752M/s null_percent=0.01 size=32.768k
TopKInt64/32768/100/min_time:1.000       222127 ns       222091 ns         6332 bytes_per_second=140.708M/s items_per_second=18.4429M/s null_percent=1 size=32.768k
TopKInt64/32768/10/min_time:1.000        216600 ns       216562 ns         6353 bytes_per_second=144.3M/s items_per_second=18.9137M/s null_percent=10 size=32.768k
TopKInt64/32768/2/min_time:1.000         186647 ns       186613 ns         7585 bytes_per_second=167.459M/s items_per_second=21.9492M/s null_percent=50 size=32.768k
TopKInt64/32768/1/min_time:1.000           8101 ns         8100 ns       173012 bytes_per_second=3.7677G/s items_per_second=505.692M/s null_percent=100 size=32.768k
TopKInt64/32768/0/min_time:1.000         213979 ns       213950 ns         6595 bytes_per_second=146.062M/s items_per_second=19.1447M/s null_percent=0 size=32.768k
TopKInt64/1048576/100/min_time:1.000   11343912 ns     11341499 ns          122 bytes_per_second=88.1718M/s items_per_second=11.5568M/s null_percent=1 size=1048.58k
TopKInt64/8388608/100/min_time:1.000  123205686 ns    123179459 ns           12 bytes_per_second=64.9459M/s items_per_second=8.51259M/s null_percent=1 size=8.38861M

This implementation for stable select_k with a stable heap, although it is valid, is still simple and maybe the best choise here is a sort-based algorithm. IMO, implementation of stable algorithms need more exploration, and it could be implemented in a separate follow up JIRA issue adding an optional parameter to choose between SortType{kStable, kNonStable} implementation.

Besides these updates, instead of a single SortOrder for RecordBatch and Table a new kernel called Result select_k(datum, options) was created where options.sort_keys type is std::vector<SortKey<string, order>>.

Looking forward to your thoughts. cc @lidavidm

@pitrou
Copy link
Member

pitrou commented Sep 7, 2021

Some conclusions: Both implementations (partition_nth:intro_select and partition_nth:heap) show similar numbers.

Interesting. Did you double-check that no mistake was made when running the benchmarks (i.e. are you really comparing the two different algorithms)?

IMO, implementation of stable algorithms need more exploration, and it could be implemented in a separate follow up JIRA issue

Hmm, why not, but sort_indices and partition_nth_indices are guaranteed to be stable, so it would be better if topK/bottomK wasn't inconsistent.

@lidavidm
Copy link
Member

lidavidm commented Sep 7, 2021

Thanks for the benchmarks. It looks like the stable heap is about the same or perhaps slightly slower than just sorting and slicing, which is unfortunate.

Hmm, why not, but sort_indices and partition_nth_indices are guaranteed to be stable, so it would be better if topK/bottomK wasn't inconsistent.

Since the unstable version is quite a bit faster so far here, maybe we should expose a select_k_unstable? (Was the reason why we don't have an unstable sort because it's not useful and/or wasn't meaningfully faster?)

@pitrou
Copy link
Member

pitrou commented Sep 7, 2021 via email

@pitrou
Copy link
Member

pitrou commented Sep 7, 2021

Another thought: isn't one attraction of TopK to have a streaming algorithm with O(k) memory consumption? Making a full sort requires to materialize the entire input, hence O(n) memory consumption.

@lidavidm
Copy link
Member

lidavidm commented Sep 7, 2021

Another thought: isn't one attraction of TopK to have a streaming algorithm with O(k) memory consumption? Making a full sort requires to materialize the entire input, hence O(n) memory consumption.

That is a fair point. However, this kernel as-is is not a streaming implementation and we would have to implement it as an actual aggregate kernel to get that behavior.

@aocsa
Copy link
Contributor Author

aocsa commented Sep 7, 2021

Another thought: isn't one attraction of TopK to have a streaming algorithm with O(k) memory consumption? Making a full sort requires to materialize the entire input, hence O(n) memory consumption.

That is a fair point. However, this kernel as-is is not a streaming implementation and we would have to implement it as an actual aggregate kernel to get that behavior.

I have a question related to that, the streaming implementation would be using ExecNodes right?

@aocsa
Copy link
Contributor Author

aocsa commented Sep 7, 2021

Some conclusions: Both implementations (partition_nth:intro_select and partition_nth:heap) show similar numbers.

Interesting. Did you double-check that no mistake was made when running the benchmarks (i.e. are you really comparing the two different algorithms)?

My bad, You were right @pitrou the numbers are not correct. Something happenned when I ran this bench both at the same time. I ran again PartitionNthToIndices with different strategies (nth_element vs heap_based) separetly and I got these numbers:

PartitionNthToIndices_nth_element

--------------------------------------------------------------------------------------------------
Benchmark                                        Time             CPU   Iterations UserCounters...
--------------------------------------------------------------------------------------------------
BottomKInt64/32768/10000/min_time:1.000       8558 ns         8556 ns       166021 bytes_per_second=3.56686G/s items_per_second=478.735M/s null_percent=0.01 size=32.768k
BottomKInt64/32768/100/min_time:1.000        20545 ns        20542 ns        63571 bytes_per_second=1.4856G/s items_per_second=199.394M/s null_percent=1 size=32.768k
BottomKInt64/32768/10/min_time:1.000         17864 ns        17862 ns        72315 bytes_per_second=1.70853G/s items_per_second=229.316M/s null_percent=10 size=32.768k
BottomKInt64/32768/2/min_time:1.000          11565 ns        11564 ns       118026 bytes_per_second=2.63912G/s items_per_second=354.217M/s null_percent=50 size=32.768k
BottomKInt64/32768/1/min_time:1.000           6317 ns         6316 ns       214862 bytes_per_second=4.83149G/s items_per_second=648.472M/s null_percent=100 size=32.768k
BottomKInt64/32768/0/min_time:1.000           8407 ns         8406 ns       164849 bytes_per_second=3.63065G/s items_per_second=487.297M/s null_percent=0 size=32.768k
BottomKInt64/1048576/100/min_time:1.000     916082 ns       915966 ns         1526 bytes_per_second=1091.74M/s items_per_second=143.097M/s null_percent=1 size=1048.58k
BottomKInt64/8388608/100/min_time:1.000    7874118 ns      7871600 ns          178 bytes_per_second=1016.31M/s items_per_second=133.21M/s null_percent=1 size=8.38861M

PartitionNthToIndices_heap_based

--------------------------------------------------------------------------------------------------
Benchmark                                        Time             CPU   Iterations UserCounters...
--------------------------------------------------------------------------------------------------
BottomKInt64/32768/10000/min_time:1.000      55314 ns        55303 ns        25080 bytes_per_second=565.064M/s items_per_second=74.0641M/s null_percent=0.01 size=32.768k
BottomKInt64/32768/100/min_time:1.000        57577 ns        57568 ns        23965 bytes_per_second=542.835M/s items_per_second=71.1505M/s null_percent=1 size=32.768k
BottomKInt64/32768/10/min_time:1.000         59858 ns        59847 ns        22977 bytes_per_second=522.168M/s items_per_second=68.4416M/s null_percent=10 size=32.768k
BottomKInt64/32768/2/min_time:1.000          54008 ns        53995 ns        24750 bytes_per_second=578.753M/s items_per_second=75.8583M/s null_percent=50 size=32.768k
BottomKInt64/32768/1/min_time:1.000           5929 ns         5927 ns       235385 bytes_per_second=5.14856G/s items_per_second=691.028M/s null_percent=100 size=32.768k
BottomKInt64/32768/0/min_time:1.000          55751 ns        55741 ns        24636 bytes_per_second=560.626M/s items_per_second=73.4823M/s null_percent=0 size=32.768k
BottomKInt64/1048576/100/min_time:1.000    4542982 ns      4541548 ns          323 bytes_per_second=220.189M/s items_per_second=28.8606M/s null_percent=1 size=1048.58k
BottomKInt64/8388608/100/min_time:1.000   53349325 ns     53333611 ns           26 bytes_per_second=149.999M/s items_per_second=19.6607M/s null_percent=1 size=8.38861M

IMO, implementation of stable algorithms need more exploration, and it could be implemented in a separate follow up JIRA issue

Hmm, why not, but sort_indices and partition_nth_indices are guaranteed to be stable, so it would be better if topK/bottomK wasn't inconsistent.

Note that both algorithms uses NonStablePartitioner to separete Nulls and both are unstable.
Clearly, PartitionNthToIndices_nth_element is faster (5-6x) but PartitionNthToIndices kernel is not enough to implement TopK/BottomK because the first K elements needs to be sorted. Besides in order to Implement a stable version most probably we will need to use StablePartitioner + std::stable_partition + std::stable_sort. However the attraction of the heap-based implementation is to have a streaming algorithm.

IMO this PR should first expose a select_k_unstable and look in a follow up PR the best implementation for select_k_stable.
Looking forward to your thoughts. cc @lidavidm, @pitrou

@pitrou
Copy link
Member

pitrou commented Sep 7, 2021

Note that both algorithms uses NonStablePartitioner to separete Nulls and both are unstable.

Oh, you're right! I had forgotten that partition_nth_indices was unstable, unlike sort_indices.

Clearly, PartitionNthToIndices_nth_element is faster (5-6x) but PartitionNthToIndices kernel is not enough to implement TopK/BottomK because the first K elements needs to be sorted.

But if K is small, sorting them should be cheap, right? I'm not sure what the cutoff would be... (sqrt(N) perhaps?)

IMO this PR should first expose a select_k_unstable and look in a follow up PR the best implementation for select_k_stable

Agreed!

@lidavidm
Copy link
Member

lidavidm commented Sep 7, 2021

Another thought: isn't one attraction of TopK to have a streaming algorithm with O(k) memory consumption? Making a full sort requires to materialize the entire input, hence O(n) memory consumption.

That is a fair point. However, this kernel as-is is not a streaming implementation and we would have to implement it as an actual aggregate kernel to get that behavior.

I have a question related to that, the streaming implementation would be using ExecNodes right?

It's unclear to me. The API for scalar aggregate functions expects you to handle parallelism and streaming, but doesn't handle larger-than-memory state. I would say it should just be a scalar aggregate function if we were to implement it right now, though, since we haven't really started thinking about larger-than-memory state.

@aocsa
Copy link
Contributor Author

aocsa commented Sep 8, 2021

I updated this PR exposing only select_k_unstable using a optional parameter to choose strategy kind (Stable|NonStable). Moreover python binding were added and their respectives tests. As we agreed In a follow up PR a implementation for select_k_stable will be added. cc @pitrou @lidavidm

@aocsa
Copy link
Contributor Author

aocsa commented Sep 10, 2021

Thanks again for the comments @lidavidm. Besides the minor latest changes addressing the feedback I filed follow-up JIRA issues:

[C++][Compute] Improve top_k/bottom_k Selecters via CRTP
https://issues.apache.org/jira/browse/ARROW-13971

[C++][Compute]Implement SelectKStable
https://issues.apache.org/jira/browse/ARROW-13969

[C++][Compute]Implement streaming version for SelectK
https://issues.apache.org/jira/browse/ARROW-13970

The first one is an improvement of the selecter and template specialization for specific type. The last two are tasks to implement a streaming version of it and SelectKStable.

Apart from that, I think it is ready for final review / merging, let me know what you think.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing everything. I think we can merge, I'll also filed a followup to add a SelectKSinkNode: ARROW-13973

@lidavidm
Copy link
Member

@edponce or @cyb70289 any other comments?

@cyb70289
Copy link
Contributor

@lidavidm, I don't have comments. Thanks @aocsa !

@lidavidm lidavidm closed this in fa7cff6 Sep 10, 2021
@lidavidm
Copy link
Member

Thanks @aocsa we can tackle some of these in followups.

@lidavidm
Copy link
Member

Sorry Eduardo, I had thought that comment was your review. Thanks for filling ARROW-13974, we can fix up things there.

ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
Heap-based topk can compute these indices in O(n log k) time

Closes apache#11019 from aocsa/aocsa/ARROW-1565

Authored-by: Alexander <aocsa.cs@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants