-
Notifications
You must be signed in to change notification settings - Fork 270
feat: Support sort merge join with a join condition #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Some tests doesn't pass, its good to check them in DF, SMJ joined filter is still in progress |
|
Yea, that's why I marked this as a draft PR now. |
|
just checked DF on one of the failed queries it passes |
|
looks like Comet produces duplicates |
|
Right join fails in DF |
|
Filed apache/datafusion#10882 |
native/Cargo.toml
Outdated
| datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } | ||
| datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } | ||
| datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false } | ||
| datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "f98693e" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the commit of the PR apache/datafusion#12090. When the PR is merged, we can change back to DataFusion repo.
| AggregateExprBuilder::new(sum_udaf(), vec![child]) | ||
| .schema(schema) | ||
| .alias("count") | ||
| .with_ignore_nulls(false) | ||
| .with_distinct(false) | ||
| .build().map_err(|e| ExecutionError::DataFusionError(e.to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, DataFusion API changes.
| if !check_support(predicate, &schema) { | ||
| let selectivity = default_selectivity as f64 / 100.0; | ||
| let mut stats = input_stats.into_inexact(); | ||
| let mut stats = input_stats.to_inexact(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFusion API change.
c27f1f6 to
bb7586f
Compare
| val left = sql("SELECT * FROM tbl_a") | ||
| val right = sql("SELECT * FROM tbl_b") | ||
|
|
||
| val df8 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we can also use SQL for Anti, Semi joins?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to SQL syntax.
| case FullOuter => JoinType.FullOuter | ||
| case LeftSemi => JoinType.LeftSemi | ||
| case LeftAnti => JoinType.LeftAnti | ||
| // TODO: DF SMJ with join condition fails TPCH q21 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me have a look on q21. I remember Anti join had issues with TPCH in DF but it was fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it might be also related to apache/datafusion#11555
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #553 +/- ##
=============================================
- Coverage 55.16% 34.20% -20.96%
- Complexity 857 888 +31
=============================================
Files 109 112 +3
Lines 10542 43071 +32529
Branches 2010 9509 +7499
=============================================
+ Hits 5815 14733 +8918
- Misses 3714 25352 +21638
- Partials 1013 2986 +1973 ☔ View full report in Codecov by Sentry. |
| } | ||
| } | ||
|
|
||
| test("full outer join") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test fails currently. It needs the fix at DataFusion apache/datafusion#12159
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix was merged at DataFusion. I updated Comet to use the latest commit.
| @@ -75,7 +75,6 @@ abstract class CometTestBase | |||
| conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") | |||
| conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") | |||
| conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") | |||
| conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false") | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to set SQLConf.COALESCE_PARTITIONS_ENABLED.key false now.
We also need to remove this to trigger test failure https://github.com/apache/datafusion-comet/pull/553/files#r1730694210.
| datafusion-physical-plan = { version = "41.0.0", default-features = false } | ||
| datafusion-physical-expr-common = { version = "41.0.0", default-features = false } | ||
| datafusion-physical-expr = { version = "41.0.0", default-features = false } | ||
| datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "dff590b" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to latest DataFusion to use the two commits including bug fixes.
| } | ||
| } | ||
|
|
||
| fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New DataFusion API required for AggregateExpr trait.
| withInfo(join, cond) | ||
| return None | ||
| } | ||
| condProto.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any scenario of None.get?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (condProto.isEmpty) {
withInfo(join, cond)
return None
}If it is None, it will return None earlier.
| checkSparkAnswerAndOperator(df7) | ||
|
|
||
| val df8 = sql( | ||
| "SELECT * FROM tbl_a LEFT SEMI JOIN tbl_b ON tbl_a._2 = tbl_b._1 " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a RIGHT SEMI in Spark? afair there is still no proper support in DF for RightSemi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No
| val left = UnresolvedRelation(TableIdentifier("left")) | ||
| val right = UnresolvedRelation(TableIdentifier("right")) | ||
|
|
||
| checkSparkAnswer(left.join(right, $"left.N" === $"right.N", "full")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to rephrase it in SQL as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is copied from Spark. I think it is good to keep it as the same.
comphead
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @viirya
| checkSparkAnswerAndOperator(df9) | ||
|
|
||
| // TODO: Enable these tests after fixing the issue: | ||
| // https://github.com/apache/datafusion-comet/issues/861 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably can create a separate github ticket on this to not forget enabling tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #891
|
Thanks @comphead |
|
I need to update plan stability results... |
* Init * test * test * test * Use specified commit to test * Fix format * fix clippy * fix * fix * Fix * Change to SQL syntax * Disable SMJ LeftAnti with join filter * Fix * Add test * Add test * Update to last DataFusion commit * fix format * fix * Update diffs (cherry picked from commit e57ead4)
Which issue does this PR close?
Closes #398.
Rationale for this change
What changes are included in this PR?
How are these changes tested?