-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: implement partition_statistics for HashJoinExec #16956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
4240606 to
27efc67
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔢 Self-check (PR reviewed by myself and ready for feedback)
-
Code compiles successfully
-
Unit tests added
-
All tests pass
-
Comments added where necessary
-
PR title and description updated
-
Documentation PR created (or confirmed not needed)
-
PR size is reasonable
|
@xudong963 Could you please take a look when you have time? Thank you! |
27efc67 to
7babcc7
Compare
Thanks! I'll take a look tomorrow |
| } | ||
|
|
||
| #[test] | ||
| fn test_partition_statistics() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also suggest testing with real execution, like this: https://github.com/apache/datafusion/blob/main/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L722-L729
It'll verify the output partitioning count too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. I forgot to remove this test. This test has been moved to partition_statistics.rs. I thought I deleted it before I opened the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I still didn't find such execution-level test: https://github.com/apache/datafusion/blob/main/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L722-L729.
| Ok(stats.project(self.projection.as_ref())) | ||
| } | ||
|
|
||
| // For Auto mode or when no specific partition is requested, fall back to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For auto mode, as the comment says:
/// DataFusion optimizer decides which PartitionMode
/// mode(Partitioned/CollectLeft) is optimal based on statistics. It will
/// also consider swapping the left and right inputs for the Join
Auto,
So if the method with partition is called after JoinSelection rule, it's impossible to see Auto mode, however, the method may be called first, I suggest evaluating the cost of related code about choosing partition mode in JoinSelection, then decide if we can decide the PartitionMode here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. However, it seems challenging to get the threshold here for conducting the same evaluation. Do you have any other ideas on how to share the same logic from the JoinSelection to determine the PartitionMode? I'm uncertain how to obtain the optimizer config for accessing the threshold settings in the current API design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xudong963 Do you have time to look at the question I asked above? Thank you. Also, could you please help me reopen the PR?
For this question, I've thought about it a little more.
however, the method may be called first,
I searched the code base, no optimizer rules currently call partition_statistics(Some(partition))) before JoinSelection.
I’m also wondering — based on this, should we assert that the auto mode will never happen in this code path?
Or do we still want to determine the PartitionMode here? If so, that would mean we need to store collect_threshold_byte_size and collect_threshold_num_rows in HashJoinExec. I’m not sure if it’s a good design to expose these optimizer-related configs to the executor layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I searched the code base, no optimizer rules currently call partition_statistics(Some(partition))) before JoinSelection.
The only rule before JoinSelection, called partition_statistics, is AggregateStatistics. However, it only calls partial_agg_exec.input().partition_statistics(None).
7babcc7 to
64f74da
Compare
Signed-off-by: 0xPoe <techregister@pm.me>
64f74da to
3dcc212
Compare
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
|
@xudong963 @Jefffrey Could you please help me reopen this PR? Thanks! ref #16956 (comment) |
|
Re-opened this; I'm not as familiar with the statistics part of the codebase so maybe will need @xudong963 to take a look. Looks like have some merge conflicts to resolve first (unless there's a higher level discussion to take place first?) |
|
I'll review the PR again today |
Which issue does this PR close?
partition_statisticsAPI for more operators #15873Rationale for this change
This pull request implements the
partition_statisticsfunction specifically forHashJoinExec, taking into account the different partitioning modes.What changes are included in this PR?
partition_statisticsfunction specifically forHashJoinExec.Are these changes tested?
Yes, check the unit tests.
Are there any user-facing changes?
I'm not sure; please let me know if any document changes are needed.
/cc @xudong963