Skip to content

GH-34136: [C++] Add a concept of ordering to ExecPlan#34137

Merged
westonpace merged 6 commits intoapache:mainfrom
westonpace:feature/GH-34136--add-ordering-to-exec-plan
Feb 28, 2023
Merged

GH-34136: [C++] Add a concept of ordering to ExecPlan#34137
westonpace merged 6 commits intoapache:mainfrom
westonpace:feature/GH-34136--add-ordering-to-exec-plan

Conversation

@westonpace
Copy link
Copy Markdown
Member

@westonpace westonpace commented Feb 11, 2023

In addition, it is now possible to bypass the I/O executor on the record_batch_source, exec_batch_source, and array_vector_source.

It is now possible to create a source node from a gen::Gen generator.

BREAKING CHANGE: The default executor for record_batch_source, exec_batch_source, and array_vector_source was (erroneously) the plan's CPU executor. It now defaults properly to the I/O executor.

@github-actions
Copy link
Copy Markdown

@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #34136 has been automatically assigned in GitHub to PR creator.

@westonpace
Copy link
Copy Markdown
Member Author

westonpace commented Feb 11, 2023

This PR should be complete but it builds on #34059 which has not yet merged.

@westonpace westonpace force-pushed the feature/GH-34136--add-ordering-to-exec-plan branch from 4ffb32f to 7d55bc5 Compare February 11, 2023 01:10
@westonpace westonpace marked this pull request as ready for review February 11, 2023 01:10
@westonpace westonpace force-pushed the feature/GH-34136--add-ordering-to-exec-plan branch from 7d55bc5 to 3556eb3 Compare February 17, 2023 20:43
@westonpace westonpace requested a review from lidavidm February 17, 2023 20:44
@westonpace
Copy link
Copy Markdown
Member Author

@lidavidm Sorry, I requested your review a bit too early. I started working on the order by node and realized that the ordering I had in place didn't support the ability to specify null placement. So I changed it to a proper class similar to SortNodeOptions. The change is pretty minor but I'll leave up unless you want to take another look.

Copy link
Copy Markdown
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't look in detail at the C++ code, mostly took a look to see if I could follow the new logic, and added a few questions.

One more question: assume you would use something like DeclarationToTable, does that automatically use the ordering / batch indices if there is one, or do you still need to indicate this manually you want to use it? (like the generic SinkNodeOptions has a sequence_delivery parameter with a default of false, but I don't see that exposed through the DeclarationTo.. versions)

And looking forward to have this!

Comment on lines 157 to 159
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an order based on a column "x", does that also guarantee something about the order within each batch? (or only between batches as this paragraph explains)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should guarantee that the ordering exists within the batch as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the wording here to mention ordering within a batch.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is exactly meant with "map node"? (I don't find this term used anywhere else in the compute / Acero code or docs) Do we mean a node that uses typical element-wise scalar kernels? Also something like a filter node will always preserve ordering.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I searched for "map node" and not for "MapNode" ;)
And I see that indeed both Project and Filter node inherit from MapNode (it's still a term that is not used in our documentation though)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, this is probably an internal term. I'll update this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to "A filter or project node..."

Comment on lines 186 to 191
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, but assume you would do a filter operation that filters a large part of the data, and so you might end up with several empty batches, does that affect for example sink nodes like writing files? (do we skip empty batches there, or do we then potentially write empty files for them?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The dataset writer will discard empty batches without writing anything. However, the sink node still respects empty batches. For example, if one were doing dataset.to_batches(...) then they might see an empty batch.

I'm fairly certain this is consistent with the current implementation and not a change in behavior.

@jorisvandenbossche
Copy link
Copy Markdown
Member

One more question: assume you would use something like DeclarationToTable, does that automatically use the ordering / batch indices if there is one, or do you still need to indicate this manually you want to use it? (like the generic SinkNodeOptions has a sequence_delivery parameter with a default of false, but I don't see that exposed through the DeclarationTo.. versions)

Ah, I see there is a new variant of DeclarationToTable that uses QueryOptions as parameter, and that options struct has a sequence_output keyword. So I assume that answers my question.

Follow-up question on this: the default for sequence_output is false, so does that mean that even if you have a query plan with an order_by node at the end before consuming as eg a table, you won't get the data as ordered by default, because for the ordering to be respected, you need to explicitly enable it with sequence_output?

@westonpace
Copy link
Copy Markdown
Member Author

westonpace commented Feb 21, 2023

even if you have a query plan with an order_by node at the end before consuming as eg a table, you won't get the data as ordered by default, because for the ordering to be respected, you need to explicitly enable it with sequence_output?

Correct. Maybe I should change to an optional and the default (nullopt) would sequence when the input to the sink node is ordered. This would mean we only default to false if there is an aggregate or join. Given the cost of this sequencing should generally be pretty reasonable I think it would be an ok default (and users could still disable it if they wanted).

@westonpace
Copy link
Copy Markdown
Member Author

Correct. Maybe I should change to an optional and the default (nullopt) would sequence when the input to the sink node is ordered. This would mean we only default to false if there is an aggregate or join. Given the cost of this sequencing should generally be pretty reasonable I think it would be an ok default (and users could still disable it if they wanted).

I've done this. The default (nullopt) means "sequence if there is any ordering". It can be set to true to get "fail validation if there is no meaningful ordering" or false to get "never sequence and maximize performance even if there is a meaningful ordering".

@westonpace westonpace force-pushed the feature/GH-34136--add-ordering-to-exec-plan branch from e1d25c1 to 9d1faf2 Compare February 24, 2023 21:39
@jorisvandenbossche
Copy link
Copy Markdown
Member

This is ready to be merged?

@westonpace
Copy link
Copy Markdown
Member Author

This is ready to be merged?

Yes, thanks.

@westonpace westonpace merged commit 762329b into apache:main Feb 28, 2023
@ursabot
Copy link
Copy Markdown

ursabot commented Mar 1, 2023

Benchmark runs are scheduled for baseline = ef21008 and contender = 762329b. 762329b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.52% ⬆️0.03%] test-mac-arm
[Finished ⬇️0.0% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.22% ⬆️0.16%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 762329b7 ec2-t3-xlarge-us-east-2
[Failed] 762329b7 test-mac-arm
[Finished] 762329b7 ursa-i9-9960x
[Finished] 762329b7 ursa-thinkcentre-m75q
[Finished] ef21008d ec2-t3-xlarge-us-east-2
[Finished] ef21008d test-mac-arm
[Finished] ef21008d ursa-i9-9960x
[Finished] ef21008d ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

jorisvandenbossche added a commit that referenced this pull request Apr 4, 2023
…ith use_threads=True (#34766)

### Rationale for this change

Thanks to #34137, the ExecPlan now has a concept of ordering. When the source node is a Table, the order of the batches in the table is used as the implicit order. And when executing a plan and producing a resulting Table, the default for QueryOptions' `sequence_output` is to honor an order if there is one. 

Given that the `Table.filter` method only consists of a table source node (which adds implicit order) and a filter node (which preserves any ordering), the output will now always be ordered by default, also with the default of `use_threads=True`

### Are these changes tested?

The existing test `test_exec_plan.py::test_filter_table_ordering` still passes.

* Closes: #31880

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
ArgusLi pushed a commit to Bit-Quill/arrow that referenced this pull request May 15, 2023
…rder with use_threads=True (apache#34766)

### Rationale for this change

Thanks to apache#34137, the ExecPlan now has a concept of ordering. When the source node is a Table, the order of the batches in the table is used as the implicit order. And when executing a plan and producing a resulting Table, the default for QueryOptions' `sequence_output` is to honor an order if there is one. 

Given that the `Table.filter` method only consists of a table source node (which adds implicit order) and a filter node (which preserves any ordering), the output will now always be ordered by default, also with the default of `use_threads=True`

### Are these changes tested?

The existing test `test_exec_plan.py::test_filter_table_ordering` still passes.

* Closes: apache#31880

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++] Add the concept of "ordering" to an exec node, reject non-sensible plans

4 participants