-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[Part3] Partition and Sort Enforcement, Enforcement rule implementation #4122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@alamb @yahoNanJing @Dandandan @Jefffrey |
| physical_optimizers.push(Arc::new(Repartition::new())); | ||
| physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); | ||
| physical_optimizers.push(Arc::new(BasicEnforcement::new())); | ||
| // physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new rule BasicEnforcement had already covered the work of AddCoalescePartitionsExec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this call is removed, there is no other code that calls AddCoalescePartitionsExec and thus I think we should remove that entire module. We could do so as a follow on PR if you want to keep this one smaller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will do it in the following PR.
| equal_properties: F, | ||
| ) -> bool { | ||
| match required { | ||
| Distribution::UnspecifiedDistribution => true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add some UT for satisfy() to confirm some cases:
- UnspecifiedDistribution - UnspecifiedDistribution
- Hash - SinglePartition
- Hash - Hash
- .......
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SinglePartition should already be covered by existing UTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps @jackwener was getting at adding additional testing could also help to document the intent of satisfy() as well as making the coverage was clear, even if they did not add additional coverage
I would be interested in helping to write such tests as part of my review of this code (as it would help me gain a better understanding)
|
I plan to review this carefully tomorrow |
|
I am not familiar with this part. But this |
|
@yahoNanJing |
|
I am sorry again, I am struggling to find sufficient contiguous time to review this PR. I will set aside time first thing tomorrow morning |
| } | ||
| } | ||
|
|
||
| fn try_reorder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add some comments here for the parameters.
- For the top-down case, the
join_keysare using the its children's schema; while theexpectedandequivalence_propertiesare using the current operator's schema. - For the bottom-up case, the
join_keysandexpectedare using the its children's schema; while theequivalence_propertiesare using the current operator's schema.
Is it possible to make the schema the same for all of the parameters? Otherwise, for the right side of the bottom-up case, it may miss some reorder chance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, valid points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, in the latest code, for the bottom-up case, the join_keys, expected and equivalence_properties are all using the its children's schema.
| /// That might not match with the output partitioning of the join node's children | ||
| /// This method runs a top-down process and try to adjust the output partitioning of the children | ||
| /// if the children themselves are Joins or Aggregations. | ||
| fn adjust_input_keys_down_recursively( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the parameters, plan and parent_required share the same schema? If so, it's better to add comments for it.
| match mode { | ||
| AggregateMode::FinalPartitioned => { | ||
| let new_input = | ||
| adjust_input_keys_down_recursively( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the input of FinalPartitioned should be the Partial and they should share the same column order, it's safe to call adjust_input_keys_down_recursively here.
| right.clone(), | ||
| left.schema().fields().len(), | ||
| )?, | ||
| JoinType::RightSemi | JoinType::RightAnti => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For RightSemi and RightAnti join, the current schema should be the same with the right one. Therefore, no need to do schema adjustment.
| } else if let Some(CrossJoinExec { left, right, .. }) = | ||
| plan_any.downcast_ref::<CrossJoinExec>() | ||
| { | ||
| let new_left = adjust_input_keys_down_recursively(left.clone(), vec![])?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the current implementation for the CrossJoinExec is similar to the CollectLeft, we don't adjust for the left side.
| let new_right = | ||
| adjust_input_keys_down_recursively(right.clone(), right_keys)?; | ||
|
|
||
| Ok(Arc::new(SortMergeJoinExec::try_new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we changes the sort options here, will it influence upstream operators which may rely on it?
If this rule is called at the very beginning of the rule-based optimization process, this influence may be covered by other rules.
|
|
||
| // Check whether the requirements can be satisfied by the Aggregation | ||
| if parent_required.len() != out_put_exprs.len() | ||
| || expr_list_eq_strict_order(&out_put_exprs, &parent_required) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we don't consider the optimization of join group with sharing the same hash table. Therefore, when the group satisfy the join key, we don't push down the required ordering.
| /// This method will try to change the ordering of the join keys to match with the | ||
| /// partitioning of the join nodes' children. | ||
| /// If it can not match with both sides, it will try to match with one, either left side or right side. | ||
| fn reorder_join_keys_to_inputs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used for bottom-up
| ) | ||
| } | ||
| (_, Some(Partitioning::Hash(right_exprs, _))) => { | ||
| try_reorder(join_keys.clone(), right_exprs, equivalence_properties).or_else( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe no need to call or_else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| { | ||
| Arc::new(CoalescePartitionsExec::new(child.clone())) | ||
| } | ||
| _ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we are sure that the RepartitionExec in the plan tree should satisfy the required distribution so that we don't need to worry about duplicate RepartitionExec be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that case covered by if child.output_partitioning().satisfy() check above?
|
|
||
| // Use hash partition by default to parallelize hash joins | ||
| Ok(Arc::new(HashJoinExec::try_new( | ||
| Arc::new(RepartitionExec::try_new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very excellent refactoring. Now the RepartitionExec will be added by the BasicEnforcement optimization rule rather than be added manually for some specific operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree 100%
|
Excellent work 👍 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got through most of this PR other than datafusion/core/src/physical_optimizer/enforcement.rs which I am hoping to to do later today
So far I think it looks like a great piece of work.
Here is a list of follow on items that would be good to address (though we could do them in follow on PRs):
- Remove AddCoalescePartitionsExec
- Add more test coverage for Partitioning::satisfy()
| Arc::new(AggregateStatistics::new()), | ||
| Arc::new(HashBuildProbeOrder::new()), | ||
| ]; | ||
| physical_optimizers.push(Arc::new(BasicEnforcement::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason that BasicEnforcement must be run twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is to cover the case that Repartition rule could add additional RepartitionExec with RoundRobin partitioning, and to make sure the SinglePartition is satisfied, we run the BasicEnforcement again. Originally it was the AddCoalescePartitionsExec here, now I replace AddCoalescePartitionsExec with BasicEnforcement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool -- perhaps you can add a comment to explain the rationale
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will do it in the following PR.
| physical_optimizers.push(Arc::new(Repartition::new())); | ||
| physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); | ||
| physical_optimizers.push(Arc::new(BasicEnforcement::new())); | ||
| // physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this call is removed, there is no other code that calls AddCoalescePartitionsExec and thus I think we should remove that entire module. We could do so as a follow on PR if you want to keep this one smaller
| // then we need to have the partition count and hash functions validation. | ||
| Partitioning::Hash(partition_exprs, _) => { | ||
| let fast_match = | ||
| expr_list_eq_strict_order(&required_exprs, partition_exprs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be able to reduce the indent level of this code with a construct like
if expr_list_eq_strict_order(&required_exprs, partition_exprs) {
return true
}| ) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| expr_list_eq_strict_order( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this is a nice formulation
| vec![] | ||
| }; | ||
|
|
||
| let input_exec = if can_repartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is nice to avoid adding repartitioning directly in the planner and instead add it as a follow on pass
| } | ||
|
|
||
| /// Project Equivalence Properties. | ||
| /// 1) Add Alias, Alias can introduce additional equivalence properties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
|
||
| project_equivalence_properties(input_properties, &alias_map, &mut out_properties); | ||
| assert_eq!(out_properties.classes().len(), 1); | ||
| assert_eq!(out_properties.classes()[0].len(), 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest actually validating the contents of these equivalence classes somehow (rather than just their sizes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
| equal_properties: F, | ||
| ) -> bool { | ||
| match required { | ||
| Distribution::UnspecifiedDistribution => true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps @jackwener was getting at adding additional testing could also help to document the intent of satisfy() as well as making the coverage was clear, even if they did not add additional coverage
I would be interested in helping to write such tests as part of my review of this code (as it would help me gain a better understanding)
| /// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree | ||
| /// and give a non-optimal plan, but it can avoid the possible data skew in joins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment about 'non optimal plan' -- I think the point of this pass to ensure that the plan gets the correct answer :) If the plan doesn't get the correct answer it can't be optimal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correctness is always the P1 priority. Here the non-optimal plan means if the optimization target is to find out a best plan which can have the least number of Repartitions and Sorts, then the current implement can't reach the goal. And need more complex algorithms.
| } | ||
| } | ||
|
|
||
| /// When the physical planner creates the Joins, the ordering of join keys is from the original query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this comment -- can you explain a little more about why the ordering of join keys in the plan affects the output partitioning? Is it because the output partitioning of a join on (a, b) is always hash(a, b) which might not be the same as hash(b, a)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically I am trying to figure out under what circumstances join column reordering is needed at all. Like if you have a join on (a, b) and then you have a aggregate grouped on b, a the distributions are compatible aren't they?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is to cover the cases that complex plan graphs which include multiple joins or join on aggregations but key orderings are different.
For example:
TopJoin on (a, b, c)
bottom left join on(b, a, c)
bottom right join on(c, b, a)
Another case:
TopJoin on (a, b, c)
Agg1 group by (b, a, c)
Agg2 group by (c, b, a)
The PR comes up with two join key reordering implementations:
- Top-down approach, see the method
fn adjust_input_keys_down_recursively()
The top down approach will adjust the children's key ordering based on parent requirements:
TopJoin on (a, b, c)
bottom left join on(b, a, c)
bottom right join on(c, b, a)
will be adjusted to
TopJoin on (a, b, c)
bottom left join on(a, b, c)
bottom right join on(a, b, c)
TopJoin on (a, b, c)
Agg1 group by (b, a, c)
Agg2 group by (c, b, a)
will be adjusted to:
TopJoin on (a, b, c)
Projection(b, a, c)
Agg1 group by (a, b, c)
Projection(c, b, a)
Agg2 group by (a, b, c)
- Bottom-up approach, see the method
fn reorder_join_keys_to_inputs()
The Bottom-up approach will just adjust the parent's key ordering based on children's, either align with left or right, sometimes can not align with both. The Bottom up approach is much simpler than the top-down approach, but might not reach a best result.
TopJoin on (a, b, c)
bottom left join on(b, a, c)
bottom right join on(c, b, a)
will be adjusted to
TopJoin on (b, a, c)
bottom left join on(b, a, c)
bottom right join on(c, b, a)
And by default, the PR go with the top-down approach, and there is a session level configuration config.top_down_join_key_reordering to disable it.
The bottom-up approach will be useful in future if we plan to support storage partition-wised joins. In that case, the data sources/tables might be pre-partitioned and we can't adjust the order, so we would prefer to adjust parent's key ordering based on children's.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me -- thank you for the explanation @mingmwang -- this would also be great content to put into the comments somewhere so future readers can get a little more understanding of the rationale. We could do that as part of a follow on PR too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR could be merged as is (and do the follow up items as follow on PRs). Thanks again @mingmwang
Please let me know what you would prefer (merge and follow up PRs or modify this one).
Sure, thanks, I prefer to do the changes in the follow up PRs.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR looks great -- thank you so much @mingmwang . It is a real step forward
The only thing I would like resolved prior to approving this PR is understanding the need to reorder join keys -- everything else makes sense to me and seems reasonably tested.
Again, really nice work 🏆
| "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)", | ||
| "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, e]", | ||
| ], | ||
| // Should include 4 RepartitionExecs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does a right outer join (and full join) need the extra repartition? Mostly I don't understand why you don't need the same repartition for the inner join as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For full join, the output partitioning is UnknownPartitioning, so need the extra repartition.
For right outer join, the output partitioning is the adjusted right output partitioning(shifted column index).
In this UT, different cases are covered
Case1: right out join case
Top RightOutJoin on (a == c)
bottom RightOutJoin on(a == b)
ParquetScan
ParquetScan
ParquetScan
The expected physical plan should be:
need 4 Repartitions
Top RightOutJoin on (a == c)
Repartition(a)
bottom RightOutJoin on(a == b) -- output partition is Partition(b), can not meet requirements.
Repartition(a)
ParquetScan
Repartition(b)
ParquetScan
Repartition(c)
ParquetScan
Case2: an inner join case, original plan:
Top InnerJoin on (a == c)
bottom InnerJoin on(a == b)
ParquetScan
ParquetScan
ParquetScan
The expected physical plan should be:
need 3 Repartitions.
Top InnerJoin on (a == c)
bottom InnerJoin on(a == b) -- output partition is Partition(a), can meet the requirements.
Repartition(a)
ParquetScan
Repartition(b)
ParquetScan
Repartition(c)
ParquetScan
Case3: another inner join case, original plan:
Top InnerJoin on (b == c)
bottom InnerJoin on(a == b)
ParquetScan
ParquetScan
ParquetScan
The expected physical plan should be:
need 3 Repartitions.
Top InnerJoin on (b == c)
bottom InnerJoin on(a == b) -- output partition is Partition(a), but Partition(a) == Partition(b), EquivalenceClass takes
Repartition(a) -- place, can meet the requirements.
ParquetScan
Repartition(b)
ParquetScan
Repartition(c)
ParquetScan
/// Calculate the OutputPartitioning for Partitioned Join
pub fn partitioned_join_output_partitioning(
join_type: JoinType,
left_partitioning: Partitioning,
right_partitioning: Partitioning,
left_columns_len: usize,
) -> Partitioning {
match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
left_partitioning
}
JoinType::RightSemi | JoinType::RightAnti => right_partitioning,
JoinType::Right => {
adjust_right_output_partitioning(right_partitioning, left_columns_len)
}
JoinType::Full => {
Partitioning::UnknownPartitioning(right_partitioning.partition_count())
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a wonderful explanation -- thank you @mingmwang
| &JoinType::Inner, | ||
| ); | ||
|
|
||
| // Output partition need to respect the Alias and should not introduce additional RepartitionExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| } | ||
|
|
||
| #[test] | ||
| fn multi_smj_joins() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very clever
| let expected = &[ | ||
| top_join_plan.as_str(), | ||
| "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", | ||
| "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the need to reorder the join keys here -- if the input is partitioned on a, b, c the correct answer will still happen if you join on b, c, a
My rationale is that any particular values of a, b and c will always be the same partition if it partitioned by hash(a, b, c) or if it is partitioned by hash(b, c, a) although the specific partitions would be different it shouldn't matter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Aggregations, yes, the ordering doesn't matter. But for Partitioned Joins, it matters.
For Partitioned Joins, there are two inputs, if the left is partitioned on (B, C, A) and right is partitioned on (A, C, B), for the same particular value pairs(a , b, c), the two inputs will not co-locate to the same partition, because the hashed values are different.
| &top_join_on, | ||
| &join_type, | ||
| ); | ||
| let top_join_plan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I don't see in these plans (or in the code) is the fact that if you take several sorted streams and run them through RepartitionExec I don't think the output remains sorted. I think these plans are ok, as they sort after the repartition, but in general that is something to watch out for
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR could be merged as is (and do the follow up items as follow on PRs). Thanks again @mingmwang
Please let me know what you would prefer (merge and follow up PRs or modify this one).
| "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)", | ||
| "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, e]", | ||
| ], | ||
| // Should include 4 RepartitionExecs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a wonderful explanation -- thank you @mingmwang
|
Thanks again @mingmwang -- can you please file tickets for the follow on items? Or if you don't have time let me know so I can do so? |
|
Benchmark runs are scheduled for baseline = 30813dc and contender = 9c24a79. 9c24a79 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
Good job👍. |
Which issue does this PR close?
Closes #41
Closes #3854.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?