GH-34136: [C++] Add a concept of ordering to ExecPlan#34137
GH-34136: [C++] Add a concept of ordering to ExecPlan#34137westonpace merged 6 commits intoapache:mainfrom
Conversation
|
|
|
|
4ffb32f to
7d55bc5
Compare
7d55bc5 to
3556eb3
Compare
|
@lidavidm Sorry, I requested your review a bit too early. I started working on the order by node and realized that the ordering I had in place didn't support the ability to specify null placement. So I changed it to a proper class similar to SortNodeOptions. The change is pretty minor but I'll leave up unless you want to take another look. |
jorisvandenbossche
left a comment
There was a problem hiding this comment.
Didn't look in detail at the C++ code, mostly took a look to see if I could follow the new logic, and added a few questions.
One more question: assume you would use something like DeclarationToTable, does that automatically use the ordering / batch indices if there is one, or do you still need to indicate this manually you want to use it? (like the generic SinkNodeOptions has a sequence_delivery parameter with a default of false, but I don't see that exposed through the DeclarationTo.. versions)
And looking forward to have this!
There was a problem hiding this comment.
If there is an order based on a column "x", does that also guarantee something about the order within each batch? (or only between batches as this paragraph explains)
There was a problem hiding this comment.
Yes, it should guarantee that the ordering exists within the batch as well.
There was a problem hiding this comment.
I've updated the wording here to mention ordering within a batch.
There was a problem hiding this comment.
What is exactly meant with "map node"? (I don't find this term used anywhere else in the compute / Acero code or docs) Do we mean a node that uses typical element-wise scalar kernels? Also something like a filter node will always preserve ordering.
There was a problem hiding this comment.
OK, I searched for "map node" and not for "MapNode" ;)
And I see that indeed both Project and Filter node inherit from MapNode (it's still a term that is not used in our documentation though)
There was a problem hiding this comment.
Ah, yes, this is probably an internal term. I'll update this.
There was a problem hiding this comment.
I changed this to "A filter or project node..."
There was a problem hiding this comment.
Just wondering, but assume you would do a filter operation that filters a large part of the data, and so you might end up with several empty batches, does that affect for example sink nodes like writing files? (do we skip empty batches there, or do we then potentially write empty files for them?)
There was a problem hiding this comment.
Good question. The dataset writer will discard empty batches without writing anything. However, the sink node still respects empty batches. For example, if one were doing dataset.to_batches(...) then they might see an empty batch.
I'm fairly certain this is consistent with the current implementation and not a change in behavior.
Ah, I see there is a new variant of Follow-up question on this: the default for |
Correct. Maybe I should change to an optional and the default (nullopt) would sequence when the input to the sink node is ordered. This would mean we only default to false if there is an aggregate or join. Given the cost of this sequencing should generally be pretty reasonable I think it would be an ok default (and users could still disable it if they wanted). |
I've done this. The default (nullopt) means "sequence if there is any ordering". It can be set to true to get "fail validation if there is no meaningful ordering" or false to get "never sequence and maximize performance even if there is a meaningful ordering". |
…ources from an iterator factory) to run without an I/O executor (useful if the source is something like a vector)
…ou can' behavior. Clarified some comments
e1d25c1 to
9d1faf2
Compare
|
This is ready to be merged? |
Yes, thanks. |
|
Benchmark runs are scheduled for baseline = ef21008 and contender = 762329b. 762329b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
…ith use_threads=True (#34766) ### Rationale for this change Thanks to #34137, the ExecPlan now has a concept of ordering. When the source node is a Table, the order of the batches in the table is used as the implicit order. And when executing a plan and producing a resulting Table, the default for QueryOptions' `sequence_output` is to honor an order if there is one. Given that the `Table.filter` method only consists of a table source node (which adds implicit order) and a filter node (which preserves any ordering), the output will now always be ordered by default, also with the default of `use_threads=True` ### Are these changes tested? The existing test `test_exec_plan.py::test_filter_table_ordering` still passes. * Closes: #31880 Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
…rder with use_threads=True (apache#34766) ### Rationale for this change Thanks to apache#34137, the ExecPlan now has a concept of ordering. When the source node is a Table, the order of the batches in the table is used as the implicit order. And when executing a plan and producing a resulting Table, the default for QueryOptions' `sequence_output` is to honor an order if there is one. Given that the `Table.filter` method only consists of a table source node (which adds implicit order) and a filter node (which preserves any ordering), the output will now always be ordered by default, also with the default of `use_threads=True` ### Are these changes tested? The existing test `test_exec_plan.py::test_filter_table_ordering` still passes. * Closes: apache#31880 Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
In addition, it is now possible to bypass the I/O executor on the
record_batch_source,exec_batch_source, andarray_vector_source.It is now possible to create a source node from a
gen::Gengenerator.BREAKING CHANGE: The default executor for
record_batch_source,exec_batch_source, andarray_vector_sourcewas (erroneously) the plan's CPU executor. It now defaults properly to the I/O executor.