Skip to content

Conversation

@mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Nov 7, 2022

Which issue does this PR close?

Closes #41
Closes #3854.

Rationale for this change

What changes are included in this PR?

  1. Make EquivalenceProperties schema aware.
  2. Add a top-down process to reorder the Join keys/GroupBy keys ordering.
  3. Add Basic Enforcement rule to ensure the Distribution and Ordering requirements.

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Changes to the physical-expr crates labels Nov 7, 2022
@mingmwang
Copy link
Contributor Author

@alamb @yahoNanJing @Dandandan @Jefffrey
The last part of the PR. Please help to take a look.

physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new rule BasicEnforcement had already covered the work of AddCoalescePartitionsExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this call is removed, there is no other code that calls AddCoalescePartitionsExec and thus I think we should remove that entire module. We could do so as a follow on PR if you want to keep this one smaller

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do it in the following PR.

equal_properties: F,
) -> bool {
match required {
Distribution::UnspecifiedDistribution => true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add some UT for satisfy() to confirm some cases:

  • UnspecifiedDistribution - UnspecifiedDistribution
  • Hash - SinglePartition
  • Hash - Hash
  • .......

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SinglePartition should already be covered by existing UTs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @jackwener was getting at adding additional testing could also help to document the intent of satisfy() as well as making the coverage was clear, even if they did not add additional coverage

I would be interested in helping to write such tests as part of my review of this code (as it would help me gain a better understanding)

@alamb
Copy link
Contributor

alamb commented Nov 7, 2022

I plan to review this carefully tomorrow

@Ted-Jiang
Copy link
Member

I am not familiar with this part. But this EquivalenceProperties system is amazing 👍 ! Looking forward to see this going on 😄

@mingmwang
Copy link
Contributor Author

mingmwang commented Nov 8, 2022

@yahoNanJing
BTW, after this PR, CrossJoin and HashJoin Collect Left should also work in Ballista. Originally they do not work.

@alamb
Copy link
Contributor

alamb commented Nov 8, 2022

I am sorry again, I am struggling to find sufficient contiguous time to review this PR. I will set aside time first thing tomorrow morning

}
}

fn try_reorder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add some comments here for the parameters.

  • For the top-down case, the join_keys are using the its children's schema; while the expected and equivalence_properties are using the current operator's schema.
  • For the bottom-up case, the join_keys and expected are using the its children's schema; while the equivalence_properties are using the current operator's schema.

Is it possible to make the schema the same for all of the parameters? Otherwise, for the right side of the bottom-up case, it may miss some reorder chance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, valid points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, in the latest code, for the bottom-up case, the join_keys, expected and equivalence_properties are all using the its children's schema.

/// That might not match with the output partitioning of the join node's children
/// This method runs a top-down process and try to adjust the output partitioning of the children
/// if the children themselves are Joins or Aggregations.
fn adjust_input_keys_down_recursively(
Copy link
Contributor

@yahoNanJing yahoNanJing Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the parameters, plan and parent_required share the same schema? If so, it's better to add comments for it.

match mode {
AggregateMode::FinalPartitioned => {
let new_input =
adjust_input_keys_down_recursively(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the input of FinalPartitioned should be the Partial and they should share the same column order, it's safe to call adjust_input_keys_down_recursively here.

right.clone(),
left.schema().fields().len(),
)?,
JoinType::RightSemi | JoinType::RightAnti => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For RightSemi and RightAnti join, the current schema should be the same with the right one. Therefore, no need to do schema adjustment.

} else if let Some(CrossJoinExec { left, right, .. }) =
plan_any.downcast_ref::<CrossJoinExec>()
{
let new_left = adjust_input_keys_down_recursively(left.clone(), vec![])?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the current implementation for the CrossJoinExec is similar to the CollectLeft, we don't adjust for the left side.

let new_right =
adjust_input_keys_down_recursively(right.clone(), right_keys)?;

Ok(Arc::new(SortMergeJoinExec::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we changes the sort options here, will it influence upstream operators which may rely on it?

If this rule is called at the very beginning of the rule-based optimization process, this influence may be covered by other rules.


// Check whether the requirements can be satisfied by the Aggregation
if parent_required.len() != out_put_exprs.len()
|| expr_list_eq_strict_order(&out_put_exprs, &parent_required)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we don't consider the optimization of join group with sharing the same hash table. Therefore, when the group satisfy the join key, we don't push down the required ordering.

/// This method will try to change the ordering of the join keys to match with the
/// partitioning of the join nodes' children.
/// If it can not match with both sides, it will try to match with one, either left side or right side.
fn reorder_join_keys_to_inputs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used for bottom-up

)
}
(_, Some(Partitioning::Hash(right_exprs, _))) => {
try_reorder(join_keys.clone(), right_exprs, equivalence_properties).or_else(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe no need to call or_else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

{
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we are sure that the RepartitionExec in the plan tree should satisfy the required distribution so that we don't need to worry about duplicate RepartitionExec be added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that case covered by if child.output_partitioning().satisfy() check above?


// Use hash partition by default to parallelize hash joins
Ok(Arc::new(HashJoinExec::try_new(
Arc::new(RepartitionExec::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very excellent refactoring. Now the RepartitionExec will be added by the BasicEnforcement optimization rule rather than be added manually for some specific operators.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 100%

@yahoNanJing
Copy link
Contributor

Excellent work 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got through most of this PR other than datafusion/core/src/physical_optimizer/enforcement.rs which I am hoping to to do later today

So far I think it looks like a great piece of work.

Here is a list of follow on items that would be good to address (though we could do them in follow on PRs):

  • Remove AddCoalescePartitionsExec
  • Add more test coverage for Partitioning::satisfy()

Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that BasicEnforcement must be run twice?

Copy link
Contributor Author

@mingmwang mingmwang Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to cover the case that Repartition rule could add additional RepartitionExec with RoundRobin partitioning, and to make sure the SinglePartition is satisfied, we run the BasicEnforcement again. Originally it was the AddCoalescePartitionsExec here, now I replace AddCoalescePartitionsExec with BasicEnforcement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- perhaps you can add a comment to explain the rationale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do it in the following PR.

physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this call is removed, there is no other code that calls AddCoalescePartitionsExec and thus I think we should remove that entire module. We could do so as a follow on PR if you want to keep this one smaller

// then we need to have the partition count and hash functions validation.
Partitioning::Hash(partition_exprs, _) => {
let fast_match =
expr_list_eq_strict_order(&required_exprs, partition_exprs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to reduce the indent level of this code with a construct like

if expr_list_eq_strict_order(&required_exprs, partition_exprs) {
  return true
}

)
})
.collect::<Vec<_>>();
expr_list_eq_strict_order(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is a nice formulation

vec![]
};

let input_exec = if can_repartition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is nice to avoid adding repartitioning directly in the planner and instead add it as a follow on pass

}

/// Project Equivalence Properties.
/// 1) Add Alias, Alias can introduce additional equivalence properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


project_equivalence_properties(input_properties, &alias_map, &mut out_properties);
assert_eq!(out_properties.classes().len(), 1);
assert_eq!(out_properties.classes()[0].len(), 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest actually validating the contents of these equivalence classes somehow (rather than just their sizes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

equal_properties: F,
) -> bool {
match required {
Distribution::UnspecifiedDistribution => true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @jackwener was getting at adding additional testing could also help to document the intent of satisfy() as well as making the coverage was clear, even if they did not add additional coverage

I would be interested in helping to write such tests as part of my review of this code (as it would help me gain a better understanding)

Comment on lines +48 to +49
/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
/// and give a non-optimal plan, but it can avoid the possible data skew in joins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment about 'non optimal plan' -- I think the point of this pass to ensure that the plan gets the correct answer :) If the plan doesn't get the correct answer it can't be optimal

Copy link
Contributor Author

@mingmwang mingmwang Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correctness is always the P1 priority. Here the non-optimal plan means if the optimization target is to find out a best plan which can have the least number of Repartitions and Sorts, then the current implement can't reach the goal. And need more complex algorithms.

}
}

/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this comment -- can you explain a little more about why the ordering of join keys in the plan affects the output partitioning? Is it because the output partitioning of a join on (a, b) is always hash(a, b) which might not be the same as hash(b, a)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically I am trying to figure out under what circumstances join column reordering is needed at all. Like if you have a join on (a, b) and then you have a aggregate grouped on b, a the distributions are compatible aren't they?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to cover the cases that complex plan graphs which include multiple joins or join on aggregations but key orderings are different.
For example:

TopJoin on (a, b, c)
   bottom left join on(b, a, c) 
   bottom right join on(c, b, a) 

Another case:

TopJoin on (a, b, c)
   Agg1 group by (b, a, c) 
   Agg2 group by (c, b, a) 

The PR comes up with two join key reordering implementations:

  1. Top-down approach, see the method fn adjust_input_keys_down_recursively()
    The top down approach will adjust the children's key ordering based on parent requirements:
TopJoin on (a, b, c)
   bottom left join on(b, a, c) 
   bottom right join on(c, b, a) 

will be adjusted to

TopJoin on (a, b, c)
   bottom left join on(a, b, c)
   bottom right join on(a, b, c)
TopJoin on (a, b, c)
   Agg1 group by (b, a, c) 
   Agg2 group by (c, b, a) 

will be adjusted to:

TopJoin on (a, b, c)
   Projection(b, a, c)
      Agg1 group by (a, b, c) 
   Projection(c, b, a)
      Agg2 group by (a, b, c) 
  1. Bottom-up approach, see the method fn reorder_join_keys_to_inputs()
    The Bottom-up approach will just adjust the parent's key ordering based on children's, either align with left or right, sometimes can not align with both. The Bottom up approach is much simpler than the top-down approach, but might not reach a best result.
TopJoin on (a, b, c)
   bottom left join on(b, a, c) 
   bottom right join on(c, b, a) 

will be adjusted to

TopJoin on (b, a, c)
   bottom left join on(b, a, c) 
   bottom right join on(c, b, a) 

And by default, the PR go with the top-down approach, and there is a session level configuration config.top_down_join_key_reordering to disable it.
The bottom-up approach will be useful in future if we plan to support storage partition-wised joins. In that case, the data sources/tables might be pre-partitioned and we can't adjust the order, so we would prefer to adjust parent's key ordering based on children's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me -- thank you for the explanation @mingmwang -- this would also be great content to put into the comments somewhere so future readers can get a little more understanding of the rationale. We could do that as part of a follow on PR too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR could be merged as is (and do the follow up items as follow on PRs). Thanks again @mingmwang

Please let me know what you would prefer (merge and follow up PRs or modify this one).

Sure, thanks, I prefer to do the changes in the follow up PRs.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR looks great -- thank you so much @mingmwang . It is a real step forward

The only thing I would like resolved prior to approving this PR is understanding the need to reorder join keys -- everything else makes sense to me and seems reasonably tested.

Again, really nice work 🏆

"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
"ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does a right outer join (and full join) need the extra repartition? Mostly I don't understand why you don't need the same repartition for the inner join as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For full join, the output partitioning is UnknownPartitioning, so need the extra repartition.
For right outer join, the output partitioning is the adjusted right output partitioning(shifted column index).
In this UT, different cases are covered

Case1: right out join case

Top RightOutJoin on (a == c)
   bottom RightOutJoin on(a == b)
       ParquetScan
       ParquetScan
   ParquetScan

The expected physical plan should be:
need 4 Repartitions

Top RightOutJoin on (a == c)
   Repartition(a)
   bottom RightOutJoin on(a == b)   -- output partition is Partition(b), can not meet requirements.
       Repartition(a)
           ParquetScan
       Repartition(b)
           ParquetScan
   Repartition(c)
       ParquetScan

Case2: an inner join case, original plan:

Top InnerJoin on (a == c)
   bottom InnerJoin on(a == b)
       ParquetScan
       ParquetScan
   ParquetScan

The expected physical plan should be:
need 3 Repartitions.

Top InnerJoin on (a == c)
   bottom InnerJoin on(a == b) -- output partition is Partition(a), can meet the requirements.
       Repartition(a)
           ParquetScan
       Repartition(b)
           ParquetScan
   Repartition(c)
       ParquetScan

Case3: another inner join case, original plan:

Top InnerJoin on (b == c)
   bottom InnerJoin on(a == b)
       ParquetScan
       ParquetScan
   ParquetScan

The expected physical plan should be:
need 3 Repartitions.

Top InnerJoin on (b == c)
   bottom InnerJoin on(a == b) -- output partition is Partition(a), but Partition(a) == Partition(b), EquivalenceClass takes 
        Repartition(a)                    -- place, can meet the requirements.
           ParquetScan
       Repartition(b)
           ParquetScan
   Repartition(c)
       ParquetScan
/// Calculate the OutputPartitioning for Partitioned Join
pub fn partitioned_join_output_partitioning(
    join_type: JoinType,
    left_partitioning: Partitioning,
    right_partitioning: Partitioning,
    left_columns_len: usize,
) -> Partitioning {
    match join_type {
        JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
            left_partitioning
        }
        JoinType::RightSemi | JoinType::RightAnti => right_partitioning,
        JoinType::Right => {
            adjust_right_output_partitioning(right_partitioning, left_columns_len)
        }
        JoinType::Full => {
            Partitioning::UnknownPartitioning(right_partitioning.partition_count())
        }
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a wonderful explanation -- thank you @mingmwang

&JoinType::Inner,
);

// Output partition need to respect the Alias and should not introduce additional RepartitionExec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

#[test]
fn multi_smj_joins() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very clever

let expected = &[
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need to reorder the join keys here -- if the input is partitioned on a, b, c the correct answer will still happen if you join on b, c, a

My rationale is that any particular values of a, b and c will always be the same partition if it partitioned by hash(a, b, c) or if it is partitioned by hash(b, c, a) although the specific partitions would be different it shouldn't matter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Aggregations, yes, the ordering doesn't matter. But for Partitioned Joins, it matters.
For Partitioned Joins, there are two inputs, if the left is partitioned on (B, C, A) and right is partitioned on (A, C, B), for the same particular value pairs(a , b, c), the two inputs will not co-locate to the same partition, because the hashed values are different.

&top_join_on,
&join_type,
);
let top_join_plan =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I don't see in these plans (or in the code) is the fact that if you take several sorted streams and run them through RepartitionExec I don't think the output remains sorted. I think these plans are ok, as they sort after the repartition, but in general that is something to watch out for

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR could be merged as is (and do the follow up items as follow on PRs). Thanks again @mingmwang

Please let me know what you would prefer (merge and follow up PRs or modify this one).

"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
"ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a wonderful explanation -- thank you @mingmwang

@alamb alamb merged commit 9c24a79 into apache:master Nov 10, 2022
@alamb
Copy link
Contributor

alamb commented Nov 10, 2022

Thanks again @mingmwang -- can you please file tickets for the follow on items? Or if you don't have time let me know so I can do so?

@ursabot
Copy link

ursabot commented Nov 10, 2022

Benchmark runs are scheduled for baseline = 30813dc and contender = 9c24a79. 9c24a79 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@jackwener
Copy link
Member

Good job👍.
Although the work has been merged, I'm going to take a closer look over the weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-expr Changes to the physical-expr crates

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Partition and Sort Enforcement Support hash repartion elimination

6 participants