-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36571][SQL] Add new SQLPathHadoopMapReduceCommitProtocol resolve conflict when write into partition table's different partition #35319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
gentle ping @cloud-fan |
|
@steveloughran Hi Steve, this pr's desc can explain some of your confuse in #33828 |
steveloughran
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code looks ok to me, would like to see the viewfs and validation of that -ext-1000 suffix to make sure those bits continue to work in future, and that regressions are found in unit tests rather than support calls
|
|
||
| import testImplicits._ | ||
|
|
||
| val stagingParentDir = Utils.createTempDir() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be nice if you could target this against actual filesystem uris rather than just file://; but that will take more changes in the base classes. would help with those of us trying to regression test other committers through spark sql
| engineType: String, | ||
| jobId: String): Path = { | ||
| val extURI = path.toUri | ||
| if (extURI.getScheme == "viewfs") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no test for this in the tests that i can see...it'd be good to have that viewfs coverage tested too.
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
For current data source insert sql commit protocol, it have below problems:
case a: both job A and job B write data into partitioned table TBL with different statistic partition, it will have conflict since they use same temp location ${table_location}/_temporary/0/....,
when job A has finished and then it will clean this temp location, then cause job B’s temp data is cleared. Then it will cause job B to fail to write data.
case b: for current dynamic partition insert, if we kill a job writing data, will remain data under table location in the staging dir under table path.
case c: If we use a dynamic partition insert to insert a new table with a huge number of partitions, we need to move partition path one by one, for this case, we can just rename stagingdir path to table path to make it more quicker. But if we want to do this, we need to make staging dir can be customized and should not use the staging path under table location.
In this approach, we plan to do two thing:
Make staging dir can be customized like hive
When we terminate a job doing dynamic partition insert, it will remain staging dir under table location . If we make the staging dir can be customized like hive(such as define staging dir as /tmp/spark/.stagingdir ) can avoid remaining such staging dir and data under table path.
If we define staging dir using a tmp location, not under table location, when we use dynamic partition insert to write data to a new table, we can just rename staging dir to target table location to avoid moving partition dir one by one. It’s more quickly.
If we can customize staging dir, we can implement a new commit protocol to enable use of staging dir to write non-partitioned table and static partition insert use staging dir and won’t increase FS operation.
New SQL Commit protocol supports staging dir and won’t increase FS operation.
For current static partition insert in v1, it have step:
For current dynamic partition insert in v1, it have step:
In this new sql commit protocol SQLPathHadoopMapReduceCommitProtocol ,
for non-partition insert:
for all static partition insert:
for dynamic partition insert, we implement it as:
The new sql commit protocal’s benefit is:
- Can support Insert into non-partitioned table form it self
- Can support Insert into partition table's statistic partition and read data from target partition.
- Can support Insert into different partition using statistic partition concurrently
These are all normal problems when we use data source API insert data
Why are the changes needed?
Provide a more flexible commit protocol and won't impact perf
Does this PR introduce any user-facing change?
User can set sql commit protocol to
SQLPathHadoopMapReduceCommitProtocolto use a commit protocol with staging dirHow was this patch tested?
Added UT