-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36563][SQL] dynamicPartitionOverwrite can direct rename to targetPath instead of partition path one by one when targetPath is empty #33811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…getPath instead of partition path one by one when targetPath is empty
|
ping @cloud-fan |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #142699 has finished for PR 33811 at commit
|
|
Test build #142700 has finished for PR 33811 at commit
|
|
also ping @maropu @dongjoon-hyun @viirya |
|
Thank you for pinging me, @AngersZhuuuu . |
| .createOptional | ||
|
|
||
| val FILE_STAGING_DIR = | ||
| buildConf("spark.sql.source.stagingDir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks orthogonal what your PR aims. Could you spin off this new configuration as a new PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do this first.
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
Outdated
Show resolved
Hide resolved
| throw new IOException(s"Failed to rename $stagingPartPath to $finalPartPath when " + | ||
| val targetPath = new Path(path) | ||
| val pathExisted = fs.exists(targetPath) | ||
| if (!pathExisted || fs.listStatus(targetPath).isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test coverage for both fs.exist and fs.listStatus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
Outdated
Show resolved
Hide resolved
| .saveAsTable("t") | ||
| checkAnswer(sql("SELECT * FROM t"), df) | ||
| checkAnswer(sql("SELECT * FROM t WHERE p1 = 2"), Row(1, 2) :: Nil) | ||
| checkAnswer(sql("SELECT * FROM t WHERE p1 = 4"), Row(3, 4) :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test case fail before your PR? IIUC, this doesn't provide a test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test case fail before your PR? IIUC, this doesn't provide a test coverage.
Just to show in new way, it can write data as before?
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #142746 has finished for PR 33811 at commit
|
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
When target path is empty, we can directly rename stagingDir to targetPath avoid to rename path one by one
Why are the changes needed?
Optimize file commit logic
Does this PR introduce any user-facing change?
User can set
spark.sql.source.stagingDirto enable direct rename to targetPath instead of partition path one by one when targetPath is empty for dynamicPartitionOverWriteHow was this patch tested?
Added UT