Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Jan 25, 2022

What changes were proposed in this pull request?

For current data source insert sql commit protocol, it have below problems:
case a: both job A and job B write data into partitioned table TBL with different statistic partition, it will have conflict since they use same temp location ${table_location}/_temporary/0/....,
when job A has finished and then it will clean this temp location, then cause job B’s temp data is cleared. Then it will cause job B to fail to write data.
case b: for current dynamic partition insert, if we kill a job writing data, will remain data under table location in the staging dir under table path.
case c: If we use a dynamic partition insert to insert a new table with a huge number of partitions, we need to move partition path one by one, for this case, we can just rename stagingdir path to table path to make it more quicker. But if we want to do this, we need to make staging dir can be customized and should not use the staging path under table location.

In this approach, we plan to do two thing:

  1. Make staging dir can be customized like hive

    • When we terminate a job doing dynamic partition insert, it will remain staging dir under table location . If we make the staging dir can be customized like hive(such as define staging dir as /tmp/spark/.stagingdir ) can avoid remaining such staging dir and data under table path.

    • If we define staging dir using a tmp location, not under table location, when we use dynamic partition insert to write data to a new table, we can just rename staging dir to target table location to avoid moving partition dir one by one. It’s more quickly.

    • If we can customize staging dir, we can implement a new commit protocol to enable use of staging dir to write non-partitioned table and static partition insert use staging dir and won’t increase FS operation.

  2. New SQL Commit protocol supports staging dir and won’t increase FS operation.

For current static partition insert in v1, it have step:

1. Task attempts firstly write files under the intermediate path, e.g. /path/to/outputPath/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/{part_spec_path}/xxx.parquet.

2. Then task commit file to /path/to/outputPath/_temporary/{appId}/_temporary/{taskId}/{part_spec_path}/xxx.parquet.

3. Job commit move file to /path/to/outputPath/{part_spec_path}/xxx.parquet.

For current dynamic partition insert in v1, it have step:

1. Task attempts firstly write files under the intermediate path, e.g. /path/to/outputPath/.spark-staging-{jobId}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet.
2. Then task commit file to /path/to/outputPath/.spark-staging-{jobId}/_temporary/{appId}/_temporary/{taskId}/a=1/b=1/xxx.parquet.
  1. During Job commit move file to /path/to/outputPath/a=1/b=1/xxx.parquet.

In this new sql commit protocol SQLPathHadoopMapReduceCommitProtocol ,

for non-partition insert:

1. Task attempts firstly write files under the intermediate path, e.g. {staging_dir}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/xxx.parquet.
2. Then task commit file to {staging_dir}/_temporary/{appId}/_temporary/{taskId}/xxx.parquet.
3. When job commit, moving file to /path/to/outputPath/xxx.parquet.

for all static partition insert:

1. Task attempts firstly write files under the intermediate path, e.g. {staging_dir}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/{part_spec_path}/xxx.parquet.
2. Then task commit file to {staging_dir}/_temporary/{appId}/_temporary/{taskId}/{part_spec_path}/xxx.parquet.
3. When job commit, moving file to /path/to/outputPath/{part_spec_path}/xxx.parquet.

for dynamic partition insert, we implement it as:

1. Task attempts firstly write files under the intermediate path, e.g. {staging_dir}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/{part_spec_path}/xxx.parquet.
2. Then task commit file to {staging_dir}/_temporary/{appId}/_temporary/{taskId}/{part_spec_path}/xxx.parquet.
3. When job commit, move file to /{staging_dir}/{part_spec_path}/xxx.parquet.  Then, if we write to a empty table path, we directly rename   /{staging_dir} to /{table_path}, if not, we move partition dir one by one from  /{staging_dir}/{part_spec_path} to  /{table_location}/{part_spec_path}

The new sql commit protocal’s benefit is:
- Can support Insert into non-partitioned table form it self
- Can support Insert into partition table's statistic partition and read data from target partition.
- Can support Insert into different partition using statistic partition concurrently

These are all normal problems when we use data source API insert data

Why are the changes needed?

Provide a more flexible commit protocol and won't impact perf

Does this PR introduce any user-facing change?

User can set sql commit protocol to SQLPathHadoopMapReduceCommitProtocol to use a commit protocol with staging dir

How was this patch tested?

Added UT

@AngersZhuuuu AngersZhuuuu marked this pull request as draft January 25, 2022 11:30
@AngersZhuuuu AngersZhuuuu changed the title [SPARK-36571][SQL] Add new NewSQLHadoopMapReduceCommitProtocol resolve conflict when write into partition table's different partition [SPARK-36571][SQL] Add new SQLPathHadoopMapReduceCommitProtocol resolve conflict when write into partition table's different partition Jan 25, 2022
@AngersZhuuuu AngersZhuuuu marked this pull request as ready for review January 26, 2022 08:49
@AngersZhuuuu
Copy link
Contributor Author

gentle ping @cloud-fan

@AngersZhuuuu
Copy link
Contributor Author

@steveloughran Hi Steve, this pr's desc can explain some of your confuse in #33828

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code looks ok to me, would like to see the viewfs and validation of that -ext-1000 suffix to make sure those bits continue to work in future, and that regressions are found in unit tests rather than support calls


import testImplicits._

val stagingParentDir = Utils.createTempDir()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be nice if you could target this against actual filesystem uris rather than just file://; but that will take more changes in the base classes. would help with those of us trying to regression test other committers through spark sql

engineType: String,
jobId: String): Path = {
val extURI = path.toUri
if (extURI.getScheme == "viewfs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no test for this in the tests that i can see...it'd be good to have that viewfs coverage tested too.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 28, 2022
@github-actions github-actions bot closed this Sep 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants