Skip to content

GH-34248: [C++] Add an order_by node#34249

Merged
westonpace merged 3 commits intoapache:mainfrom
westonpace:feature/GH-34248--add-order-by-node
Mar 21, 2023
Merged

GH-34248: [C++] Add an order_by node#34249
westonpace merged 3 commits intoapache:mainfrom
westonpace:feature/GH-34248--add-order-by-node

Conversation

@westonpace
Copy link
Copy Markdown
Member

@westonpace westonpace commented Feb 18, 2023

@westonpace
Copy link
Copy Markdown
Member Author

Leaving in draft form until #34137 merges

@github-actions
Copy link
Copy Markdown

@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #34248 has been automatically assigned in GitHub to PR creator.

@jorisvandenbossche
Copy link
Copy Markdown
Member

To review, this needs to be rebased first now #34137 is merged?

@westonpace westonpace force-pushed the feature/GH-34248--add-order-by-node branch from 69f1e40 to 85e9f13 Compare March 2, 2023 23:41
@westonpace westonpace marked this pull request as ready for review March 2, 2023 23:41
@westonpace
Copy link
Copy Markdown
Member Author

To review, this needs to be rebased first now #34137 is merged?

Yes, thanks. I've done that now.

@westonpace westonpace force-pushed the feature/GH-34248--add-order-by-node branch from 85e9f13 to 74f7b23 Compare March 4, 2023 01:34
@github-actions github-actions bot added the awaiting review Awaiting review label Mar 4, 2023
@westonpace westonpace force-pushed the feature/GH-34248--add-order-by-node branch from 74f7b23 to 8a68df6 Compare March 4, 2023 03:21
@westonpace westonpace requested a review from lidavidm March 9, 2023 17:18
@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Mar 9, 2023
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OrderBySinkNode is using SortOptions as argument instead of Ordering, but looking into those two classes, that should basically be equivalent? (SortOptions also accepts a vector of SortKey and a null_placement arg, just as the Ordering class)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, SortOptions and Ordering are equivalent. However, Ordering doesn't extend from FunctionOptions. I wasn't sure how to proceed here so I ended up creating a new type. It felt a bit weird for a FunctionOptions to be used in exec plans completely unrelated to running functions. Although, to play devil's advocate, FunctionOptions is "mostly" just a marker interface and doesn't really prevent anything from working so I would open to just using that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Ordering certainly makes sense, and it's quite trivial to convert the one into the other, I was just wondering for a consistent API if we want to add a constructor version that accepts the SortOptions (or for OrderBySinkNode to add a version that accepts Ordering)

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Mar 14, 2023
@jorisvandenbossche
Copy link
Copy Markdown
Member

jorisvandenbossche commented Mar 14, 2023

I added python bindings for this to give it a go (I could push those here, if you want).

I was curious to check that it actually sorts the full dataset (and not just per batch) and so how this would then perform. Doing:

import pyarrow as pa
import pyarrow.compute as pc
from pyarrow._acero import OrderByNodeOptions, TableSourceNodeOptions, Declaration

arr1 = np.arange(10_000_000)
arr2 = np.arange(10_000_000)
np.random.shuffle(arr2)
table = pa.table({'a': arr1, 'b': arr2})
table_chunked = pa.Table.from_batches(table.to_batches(max_chunksize=1_000_000))

decl1 = Declaration.from_sequence([
    Declaration("table_source", TableSourceNodeOptions(table)),
    Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
])

%time res1 = decl1.to_table()


decl2 = Declaration.from_sequence([
    Declaration("table_source", TableSourceNodeOptions(table_chunked)),
    Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
])
%time res2 = decl2.to_table()
# CPU times: user 7.4 s, sys: 123 ms, total: 7.52 s
# Wall time: 7.52 s

Both those timings take around 7.5s on my laptop (release build). So running the exec plan on a Table of one chunk or a chunked table doesn't seem to matter in practice, I assume because Acero actually still uses smaller exec batches anyway.

Now, comparing this with the "manual" way to sort the data using the direct compute kernel (this is also how we currently implemented Table.sort_by), that is quite a bit faster:

In [4]: %%time
   ...: indices = pc.sort_indices(table, [("b", "ascending")])
   ...: table.take(indices)
CPU times: user 2.39 s, sys: 135 ms, total: 2.53 s
Wall time: 2.54 s

In [5]: %%time
   ...: indices = pc.sort_indices(table_chunked, [("b", "ascending")])
   ...: table_chunked.take(indices)
CPU times: user 4.27 s, sys: 83.6 ms, total: 4.35 s
Wall time: 4.35 s

So here it actually matters if the table is chunked or not (I assume in this case the compute kernel uses the batching as how it is present in the data, and doesn't use smaller batch size automatically).

And when manually using a similarly small batch size as Acero, we get more or less the same timing of 7s as above:

In [11]: table_chunked2 = pa.Table.from_batches(table.to_batches(max_chunksize=32768))

In [12]: %%time
    ...: indices = pc.sort_indices(table_chunked2, [("b", "ascending")])
    ...: table_chunked2.take(indices)
CPU times: user 7.53 s, sys: 98.6 ms, total: 7.63 s
Wall time: 7.63 s

This might all be expected (I think sorting chunks is done by first sorting per chunk and then "merging" the indices? That certainly sounds to give some overhead compared to sorting in one go on a larger array). But that also means that if you have in-memory data (or a dataset that will fit easily into memory) and you have an order_by node in your pipeline, it might be beneficial to set a larger exec batch size? (probably depending on which other nodes are present that might be able to take more advantage from processing smaller batches in parallel)

For example, for the Table.sort_by implementation, if that would rely on using an ExecPlan (which is actually what we first did, assuming this would be more efficient (#14976 (comment)), but I changed that to use sort_indices/take again to fix minimal build tests), we would need to specify a large batch size, otherwise this would be a lot slower than it can be.

For in-memory data like a Table, it's actually faster to just concatenate (copy) the data to a single contiguous chunk first and then sort, than keeping the existing chunks:

In [29]: %%time
    ...: table_concatted = pa.Table.from_arrays([pa.concat_arrays(table_chunked2["a"].chunks), pa.concat_arrays(table_chunked2["b"].chunks)], schema=table_chunked2.schema)
    ...: indices = pc.sort_indices(table_concatted, [("b", "ascending")])
    ...: table_concatted.take(indices)
CPU times: user 2.74 s, sys: 132 ms, total: 2.87 s
Wall time: 2.88 s

This becomes a bit off-topic for this actual PR (sorry!), but so that probably means we should certainly expose configuring the exec batch size? (inferring this automatically if there is an order_by node is probably difficult to do generally). This isn't yet exposed in the python bindings (so can't test it), but I also didn't directly see how it is done in C++. The QueryOptions don't seem to have such a argument. In exec.h, there is kDefaultExecChunksize defined, but I don't see that actually used somewhere.
ExecContext has a set_exec_chunksize, but it's default is max, which is much larger than what is actually used.

@westonpace westonpace force-pushed the feature/GH-34248--add-order-by-node branch from 8a68df6 to 38bf313 Compare March 20, 2023 20:37
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Mar 20, 2023
@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting change review Awaiting change review labels Mar 20, 2023
@westonpace
Copy link
Copy Markdown
Member Author

Sorry it took so long to get back to this. @jorisvandenbossche thanks for the testing and exploration. For others reading this I should mention this conversation continued on Zulip as well. The outcome was that we should maybe make the batch size configurable but also maybe use a larger batch size in general.

@westonpace westonpace merged commit 69118b2 into apache:main Mar 21, 2023
@ursabot
Copy link
Copy Markdown

ursabot commented Mar 21, 2023

Benchmark runs are scheduled for baseline = 08fb861 and contender = 69118b2. 69118b2 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.64% ⬆️0.0%] test-mac-arm
[Finished ⬇️0.51% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.31% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 69118b2d ec2-t3-xlarge-us-east-2
[Finished] 69118b2d test-mac-arm
[Finished] 69118b2d ursa-i9-9960x
[Finished] 69118b2d ursa-thinkcentre-m75q
[Finished] 08fb8610 ec2-t3-xlarge-us-east-2
[Finished] 08fb8610 test-mac-arm
[Finished] 08fb8610 ursa-i9-9960x
[Finished] 08fb8610 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link
Copy Markdown

ursabot commented Mar 21, 2023

['Python', 'R'] benchmarks have high level of regressions.
ursa-i9-9960x

@jorisvandenbossche
Copy link
Copy Markdown
Member

Thanks for finishing up! Pushed the python bindings I wrote to a new PR: #34654

jorisvandenbossche added a commit that referenced this pull request Mar 22, 2023
Adds Python bindings for the OrderByNode added in #34249 
* Closes: #34248

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this pull request May 15, 2023
Adds Python bindings for the OrderByNode added in apache#34249 
* Closes: apache#34248

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++] Add an order_by node which can reassign an ordering mid-plan

4 participants