Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Aug 25, 2021

What changes were proposed in this pull request?

Consider such cases:

  1. we close a job when it is doing dynamic partition insert, it will remain such staging dir under table's path. So we make the staging dir customized like hive can avoid remain such staging dir under table path.
  2. In hive's API, if we specify a staging dir, not use default staging dir (under table path), it can directly rename to target path and can avoid many hdfs file operations. In spark currently only dynamic partition insert support staging dir, we can do this like [SPARK-36563][SQL] dynamicPartitionOverwrite can direct rename to targetPath instead of partition path one by one when targetPath is empty #33811
  3. We can support add a file commit protocol that support staging dir for all types of insert, then when we use that commit protocol, wen can do:
    • Insert into non-partitioned table form it self
    • Insert into partition table's statistic partition and read data from target partition
    • Insert into different partition using statistic partition together

Why are the changes needed?

Make spark data source insert's stagingDir can be customized and then we can do more optimize base on this.

Does this PR introduce any user-facing change?

User can define staging dir by spark.exec.stagingDir

How was this patch tested?

Added UT

@AngersZhuuuu
Copy link
Contributor Author

ping @dongjoon-hyun

@github-actions github-actions bot added the SQL label Aug 25, 2021
@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47250/

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Test build #142750 has finished for PR 33828 at commit 2031f5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47250/

.booleanConf
.createWithDefault(true)

val FILE_COMMIT_STAGING_DIR =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since always SQL related, so add this config in SQL part, although it's used in core part

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reasonable, but it's an assumption based on this PR AS-IS scope. Some other PRs may try to use it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reasonable, but it's an assumption based on this PR AS-IS scope. Some other PRs may try to use it later.

I'm planning to optimize the Commit protocol too and discuss with @cloud-fan https://docs.google.com/document/d/13yzpIUAmgQaJ1Jnu0kqQ4DORDxQoZJmWaJMAVMajdi0/edit#
If we can do like that, I think it's will be more convenient for us to do more optimize on sql part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reasonable, but it's an assumption based on this PR AS-IS scope. Some other PRs may try to use it later.

@dongjoon-hyun I think current code could be reviewed now and have update the desc detailly.

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47254/

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47256/

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47256/

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Test build #142754 has finished for PR 33828 at commit c29f55e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Test build #142756 has finished for PR 33828 at commit 2604c9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47264/

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47264/

@SparkQA
Copy link

SparkQA commented Aug 25, 2021

Test build #142764 has finished for PR 33828 at commit 71f6b17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 27, 2021

cc @mridulm , @Ngone51 for core part addition.

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Test build #144123 has finished for PR 33828 at commit 1947cbf.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48600/

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48604/

@SparkQA
Copy link

SparkQA commented Dec 13, 2021

Test build #146116 has finished for PR 33828 at commit 632d725.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @steveloughran FYI

@steveloughran
Copy link
Contributor

ooh, commit protocols. wonderful and scary, course, the future is delta and iceberg, isn't it?

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I do the new committer for abfs and gcs (please review this week! at apache/hadoop#2971), I've been wondering how we could support staging dirs in filesystems where file renames are fast and correct (dir rename is non-atomic on gcs, file rename fails really dramatically on abfs when caller exceeds allocated capacity, hence throttling and recovery by etag checks in the new committer).
what if you defined a standard method name getStagingDir(): String which you look for through reflection and pick up. alternatively, if you want the ability to probe a committer to see if is on an fs where rename works, we could coordinate using StreamCapabilites to add a probe you can use. this is in hadoop-2 so you can use it in your code to comoile everywhere, i can add it in the new committer.

Can I also note that it is time Spark moved to the v2 committer API and org.apache.hadoop.mapreduce.OutputCommitter over the mapred package. it will simplify that bridging i have to do.

engineType: String,
jobId: String): Path = {
val extURI = path.toUri
if (extURI.getScheme == "viewfs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always a bit brittle using the uri scheme. any way to avoid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always a bit brittle using the uri scheme. any way to avoid?

To be honest, I don't know why we need to handle viewfs separately and after searching origin pull request of this part, don't see the discussion, so I am not sure about this. And this part is just moved to here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably there to ensure you don't create staging dirs in a different fs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably there to ensure you don't create staging dirs in a different fs

Yea, I remembered that hive will check tempLocation and targetOutputPath's FS and EZ when call HiveMetastore loadTable/loadPartition, Should move that logical to here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably there to ensure you don't create staging dirs in a different fs

How about current?

class StagingInsertSuite extends QueryTest with SharedSparkSession {
import testImplicits._

val stagingDir = Utils.createTempDir()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be nice if this could be subclassed so we could add tests which tried this on, say abfs or gcs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be nice if this could be subclassed so we could add tests which tried this on, say abfs or gcs

You mean, use abfsm gcs as staging dir?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean write tests which can be subclassed and then retargeted at a remote store. a lot of the spark relations test assume local fs everywhere, for example, and i had to copy and paste for cloud storage testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean write tests which can be subclassed and then retargeted at a remote store. a lot of the spark relations test assume local fs everywhere, for example, and i had to copy and paste for cloud storage testing

Got it, let me try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran How about current? Is it what you want?

@steveloughran
Copy link
Contributor

  • propose using "spark.sql.sources.writeJobUUID as the job id when set; more uniqueness and it should be set everywhere.
  • core design looks ok. but i don't see why you couldn't support concurrent jobs just by having different subdirs of __temporary for different job IDs/UUIDs, and an option to disable cleanup. (and instructions to do it later, which you'd need to do anyway).
  • because that use of __temporary/0 on file output committer is only because on a restart of the MR AM lets the committer use __temporary/1 (using app attempt number for the subdir) then moving the committed task data from job attempt 0 to its own dir, so recover all existing work. spark doesn't need that.
  • it'd be good for you to try out my manifest committer against hdfs with your workloads. it is designed to be a lot faster in job commit because all listing of task output directory trees is done in task commit, and job commit does everything in parallel (listing of manifests, loading of manifests, creating dest dirs, file rename). some of the options you don't need for hdfs (parallel delete of task attempt temp dirs)j, but I still expect a massive speedup of job commit, though not as much as for stores where listing and rename is slower.

The reason i don't explicitly target HDFS is it means I can cut out that testing/QE and focus on abfs and gcs, using benchmarks from there to tune the algorithm. For example it turns out that mkdirs on gcs is slow so you should check for existence first; that is now done in task commits, which adds duplicate probes in task commit, but there, knowing abfs does async page prefetch on a 'listStatusIterator()call, i can do thegetFileStatus(destDir)` call after making the list call and have it done while the first page of list results is coming in.
https://github.com/steveloughran/hadoop/blob/mr/MAPREDUCE-7341-manifest-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/TaskAttemptScanDirectoryStage.java#L150

numbers for HDFS would only distract me, but you will see much faster parallel job commits on "real world" partitioned trees

@steveloughran
Copy link
Contributor

oh, also, I'm thinking of making some gcs enhancements which turn off some checks under __temporary/ paths, breaking "strict" fs semantics but delivering performance through reduced io

  • skipping all overwrite/parent is dir/dest is not a directory checks when creating a file
  • not worrying about recreating parent dir markers after renaming or deleting files
    ... etc. S3A will do the same under paths with __magic an element above it, saves a HEAD and a LIST for every parquet file written (it sets overwrite=false when creating files, for no reason at all)

so you should always use _temporary as one path element in your staging dir to get any of those benefits

@AngersZhuuuu
Copy link
Contributor Author

  • propose using "spark.sql.sources.writeJobUUID as the job id when set; more uniqueness and it should be set everywhere.

Now all place use spark's job id, I can do this after this pr since it's not the same thing.

  • core design looks ok. but i don't see why you couldn't support concurrent jobs just by having different subdirs of __temporary for different job IDs/UUIDs, and an option to disable cleanup. (and instructions to do it later, which you'd need to do anyway).

Since if two job write to same table's different partition, the have same output path ${table_location}/temporary/0....
If one job succeed , it will delete that path, then another job's data is lossed.

  • because that use of __temporary/0 on file output committer is only because on a restart of the MR AM lets the committer use __temporary/1 (using app attempt number for the subdir) then moving the committed task data from job attempt 0 to its own dir, so recover all existing work. spark doesn't need that.

This is caused that spark still use FileOutputCommitter, still keep this, if we can rewrite a commit protocol, we can avoid this.

  • it'd be good for you to try out my manifest committer against hdfs with your workloads. it is designed to be a lot faster in job commit because all listing of task output directory trees is done in task commit, and job commit does everything in parallel (listing of manifests, loading of manifests, creating dest dirs, file rename). some of the options you don't need for hdfs (parallel delete of task attempt temp dirs)j, but I still expect a massive speedup of job commit, though not as much as for stores where listing and rename is slower.

Yea, I will try this later, it's a very useful design and can reduce hdfs's pressure a lot. I need to check this with our hdfs team too.

@AngersZhuuuu
Copy link
Contributor Author

oh, also, I'm thinking of making some gcs enhancements which turn off some checks under __temporary/ paths, breaking "strict" fs semantics but delivering performance through reduced io

  • skipping all overwrite/parent is dir/dest is not a directory checks when creating a file
  • not worrying about recreating parent dir markers after renaming or deleting files
    ... etc. S3A will do the same under paths with __magic an element above it, saves a HEAD and a LIST for every parquet file written (it sets overwrite=false when creating files, for no reason at all)

so you should always use _temporary as one path element in your staging dir to get any of those benefits

Having checked how to use mainfest commit protocol, but I found a problem is that

class PathOutputCommitProtocol(
    jobId: String,
    dest: String,
    dynamicPartitionOverwrite: Boolean = false)
  extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {

  if (dynamicPartitionOverwrite) {
    // until there's explicit extensions to the PathOutputCommitProtocols
    // to support the spark mechanism, it's left to the individual committer
    // choice to handle partitioning.
    throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
  }

In current Spark's code, dynamicPartitionOverwrite can't support this, it means we can't use your feature in case of dynamic partition overwriting .

We need to do some change to support this. WDYT cc @steveloughran @HyukjinKwon @cloud-fan @viirya

@steveloughran
Copy link
Contributor

steveloughran commented Mar 17, 2022

that dynamic partition worked overlapped with the committer extension work and the s3a committer.

It broke the merge; those lines you've found show the workaround.

A key problem with the spark code is that it assumes file rename is a good way to commit work. AFAIK, it doesn't assume that directory renames are atomic, but unless file renames work fast then performance is going to be unsatisfactory.

And on S3, file rename is O(data), so applications which use it to promote work (hello hive!) really suffer.
I that's why I have never looked for a good solution here.
Things are different on azure and google cloud, where file rename usually(*) works. This means that we could look about what needs to be done.

Is there anything written up on this commit protocol I could look at to see what could be done?

t the very least we could have the known committer implementations support StreamCapabilities.hasCapability() with some capabilities for the FS we could indirectly ask for related to rename (fast file rename, fast dir rename, atomic dir rename), which would let spark know what was actually viable at all. but those are really fs capabilities, you can't really expect the committer itself to know what the fs does, except in the case of the s3a committer, which is hard coded to one fs, whose semantics are known (though amplidata and netapp s3 devices do have fast file copy/rename even there...)

  • look at all the code related to etags, rename recovery and preemptive rate limiting....

@AngersZhuuuu
Copy link
Contributor Author

Is there anything written up on this commit protocol I could look at to see what could be done?

Here https://github.com/apache/spark/pull/35319/files is the whole plan.
And you can help to check the new added committer (SQLPathOutputCommitter)'s logic. This committer changed from FileOutputCommitter and it will write file to staging path then when commit data, it commit data to workpath.
So it's file operator nums is same and can avoid the conflicts I mentioned in the pr description.
Also, you can help to check if there is any problem to optimize the committer (SQLPathOutputCommitter) to use some of your idea about avoid unnecessary operations.

@AngersZhuuuu
Copy link
Contributor Author

@steveloughran Have seen your comment on #35319, I think we should do the change (#35319 (comment) and #35319 (comment) have done))in this pr as the first step. Ok?

@steveloughran
Copy link
Contributor

Ok

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 30, 2022
@github-actions github-actions bot closed this Oct 1, 2022
@beatbull
Copy link

Hi, sadly this PR got closed (automatically due to inactivity). We'd be interested in this feature & config option since the ".spark-staging-*" folders are causing trouble e.g. when using hive-partitioned tables in BigQuery (via external table) or BigTable (same issue). AFAIS there is no setting in Bigquery & bigtable to ignore the folders starting with "." or some other pattern.

We have dynamic partitioning on an inbetween level, which generates the .spark-staging folders in the hive-partition path. E.g. partitioning would regularly look like a=foo/b=42/c=2022-11-11. If c is dynamic partitioned, we get a=foo/b=42/.spark-staging-3f91233b-4992-4e05-baec-3b4533535b9d/c=2022-11-11. Even if the spark job succeeds, these .spark-staging folders still can cause queries to fail in e.g. Bigquery/bigtable as they exist temporarily. The error in Bigquery is something like:

error message: Incompatible partition schemas.
Expected schema ([a:TYPE_STRING, b:TYPE_INT64, c:TYPE_DATE]) has 3 columns. Observed schema ([a, b]) has 2 columns.
File: a=foo/b=42/.spark-staging-3f91233b-4992-4e05-baec-3b4533535b9d/c=2022-11-11

Obviously this could be also seen as a problem of BigQuery & BigTable, but having the staging dir configurable in Spark and move it outside of the table would definitely help here. Any chance this PR could be revived?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants