GH-34248: [C++] Add an order_by node#34249
Conversation
|
Leaving in draft form until #34137 merges |
|
|
|
To review, this needs to be rebased first now #34137 is merged? |
69f1e40 to
85e9f13
Compare
Yes, thanks. I've done that now. |
85e9f13 to
74f7b23
Compare
74f7b23 to
8a68df6
Compare
cpp/src/arrow/compute/exec/options.h
Outdated
There was a problem hiding this comment.
The OrderBySinkNode is using SortOptions as argument instead of Ordering, but looking into those two classes, that should basically be equivalent? (SortOptions also accepts a vector of SortKey and a null_placement arg, just as the Ordering class)
There was a problem hiding this comment.
Yes, SortOptions and Ordering are equivalent. However, Ordering doesn't extend from FunctionOptions. I wasn't sure how to proceed here so I ended up creating a new type. It felt a bit weird for a FunctionOptions to be used in exec plans completely unrelated to running functions. Although, to play devil's advocate, FunctionOptions is "mostly" just a marker interface and doesn't really prevent anything from working so I would open to just using that.
There was a problem hiding this comment.
I think the Ordering certainly makes sense, and it's quite trivial to convert the one into the other, I was just wondering for a consistent API if we want to add a constructor version that accepts the SortOptions (or for OrderBySinkNode to add a version that accepts Ordering)
|
I added python bindings for this to give it a go (I could push those here, if you want). I was curious to check that it actually sorts the full dataset (and not just per batch) and so how this would then perform. Doing: import pyarrow as pa
import pyarrow.compute as pc
from pyarrow._acero import OrderByNodeOptions, TableSourceNodeOptions, Declaration
arr1 = np.arange(10_000_000)
arr2 = np.arange(10_000_000)
np.random.shuffle(arr2)
table = pa.table({'a': arr1, 'b': arr2})
table_chunked = pa.Table.from_batches(table.to_batches(max_chunksize=1_000_000))
decl1 = Declaration.from_sequence([
Declaration("table_source", TableSourceNodeOptions(table)),
Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
])
%time res1 = decl1.to_table()
decl2 = Declaration.from_sequence([
Declaration("table_source", TableSourceNodeOptions(table_chunked)),
Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
])
%time res2 = decl2.to_table()
# CPU times: user 7.4 s, sys: 123 ms, total: 7.52 s
# Wall time: 7.52 sBoth those timings take around 7.5s on my laptop (release build). So running the exec plan on a Table of one chunk or a chunked table doesn't seem to matter in practice, I assume because Acero actually still uses smaller exec batches anyway. Now, comparing this with the "manual" way to sort the data using the direct compute kernel (this is also how we currently implemented So here it actually matters if the table is chunked or not (I assume in this case the compute kernel uses the batching as how it is present in the data, and doesn't use smaller batch size automatically). And when manually using a similarly small batch size as Acero, we get more or less the same timing of 7s as above: This might all be expected (I think sorting chunks is done by first sorting per chunk and then "merging" the indices? That certainly sounds to give some overhead compared to sorting in one go on a larger array). But that also means that if you have in-memory data (or a dataset that will fit easily into memory) and you have an order_by node in your pipeline, it might be beneficial to set a larger exec batch size? (probably depending on which other nodes are present that might be able to take more advantage from processing smaller batches in parallel) For example, for the For in-memory data like a Table, it's actually faster to just concatenate (copy) the data to a single contiguous chunk first and then sort, than keeping the existing chunks: This becomes a bit off-topic for this actual PR (sorry!), but so that probably means we should certainly expose configuring the exec batch size? (inferring this automatically if there is an order_by node is probably difficult to do generally). This isn't yet exposed in the python bindings (so can't test it), but I also didn't directly see how it is done in C++. The |
8a68df6 to
38bf313
Compare
|
Sorry it took so long to get back to this. @jorisvandenbossche thanks for the testing and exploration. For others reading this I should mention this conversation continued on Zulip as well. The outcome was that we should maybe make the batch size configurable but also maybe use a larger batch size in general. |
|
Benchmark runs are scheduled for baseline = 08fb861 and contender = 69118b2. 69118b2 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
['Python', 'R'] benchmarks have high level of regressions. |
|
Thanks for finishing up! Pushed the python bindings I wrote to a new PR: #34654 |
Adds Python bindings for the OrderByNode added in apache#34249 * Closes: apache#34248 Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Uh oh!
There was an error while loading. Please reload this page.