-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36579][CORE][SQL] Make spark source stagingDir can be customized #33828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ping @dongjoon-hyun |
|
Kubernetes integration test starting |
|
Test build #142750 has finished for PR 33828 at commit
|
|
Kubernetes integration test status success |
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val FILE_COMMIT_STAGING_DIR = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since always SQL related, so add this config in SQL part, although it's used in core part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's reasonable, but it's an assumption based on this PR AS-IS scope. Some other PRs may try to use it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's reasonable, but it's an assumption based on this PR AS-IS scope. Some other PRs may try to use it later.
I'm planning to optimize the Commit protocol too and discuss with @cloud-fan https://docs.google.com/document/d/13yzpIUAmgQaJ1Jnu0kqQ4DORDxQoZJmWaJMAVMajdi0/edit#
If we can do like that, I think it's will be more convenient for us to do more optimize on sql part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's reasonable, but it's an assumption based on this PR AS-IS scope. Some other PRs may try to use it later.
@dongjoon-hyun I think current code could be reviewed now and have update the desc detailly.
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #142754 has finished for PR 33828 at commit
|
|
Test build #142756 has finished for PR 33828 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #142764 has finished for PR 33828 at commit
|
|
Test build #144123 has finished for PR 33828 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Test build #146116 has finished for PR 33828 at commit
|
|
cc @steveloughran FYI |
|
ooh, commit protocols. wonderful and scary, course, the future is delta and iceberg, isn't it? |
steveloughran
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I do the new committer for abfs and gcs (please review this week! at apache/hadoop#2971), I've been wondering how we could support staging dirs in filesystems where file renames are fast and correct (dir rename is non-atomic on gcs, file rename fails really dramatically on abfs when caller exceeds allocated capacity, hence throttling and recovery by etag checks in the new committer).
what if you defined a standard method name getStagingDir(): String which you look for through reflection and pick up. alternatively, if you want the ability to probe a committer to see if is on an fs where rename works, we could coordinate using StreamCapabilites to add a probe you can use. this is in hadoop-2 so you can use it in your code to comoile everywhere, i can add it in the new committer.
Can I also note that it is time Spark moved to the v2 committer API and org.apache.hadoop.mapreduce.OutputCommitter over the mapred package. it will simplify that bridging i have to do.
| engineType: String, | ||
| jobId: String): Path = { | ||
| val extURI = path.toUri | ||
| if (extURI.getScheme == "viewfs") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always a bit brittle using the uri scheme. any way to avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always a bit brittle using the uri scheme. any way to avoid?
To be honest, I don't know why we need to handle viewfs separately and after searching origin pull request of this part, don't see the discussion, so I am not sure about this. And this part is just moved to here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably there to ensure you don't create staging dirs in a different fs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably there to ensure you don't create staging dirs in a different fs
Yea, I remembered that hive will check tempLocation and targetOutputPath's FS and EZ when call HiveMetastore loadTable/loadPartition, Should move that logical to here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably there to ensure you don't create staging dirs in a different fs
How about current?
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
| class StagingInsertSuite extends QueryTest with SharedSparkSession { | ||
| import testImplicits._ | ||
|
|
||
| val stagingDir = Utils.createTempDir() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be nice if this could be subclassed so we could add tests which tried this on, say abfs or gcs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be nice if this could be subclassed so we could add tests which tried this on, say abfs or gcs
You mean, use abfsm gcs as staging dir?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean write tests which can be subclassed and then retargeted at a remote store. a lot of the spark relations test assume local fs everywhere, for example, and i had to copy and paste for cloud storage testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean write tests which can be subclassed and then retargeted at a remote store. a lot of the spark relations test assume local fs everywhere, for example, and i had to copy and paste for cloud storage testing
Got it, let me try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran How about current? Is it what you want?
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
Outdated
Show resolved
Hide resolved
The reason i don't explicitly target HDFS is it means I can cut out that testing/QE and focus on abfs and gcs, using benchmarks from there to tune the algorithm. For example it turns out that mkdirs on gcs is slow so you should check for existence first; that is now done in task commits, which adds duplicate probes in task commit, but there, knowing abfs does async page prefetch on a 'listStatusIterator() numbers for HDFS would only distract me, but you will see much faster parallel job commits on "real world" partitioned trees |
|
oh, also, I'm thinking of making some gcs enhancements which turn off some checks under __temporary/ paths, breaking "strict" fs semantics but delivering performance through reduced io
so you should always use _temporary as one path element in your staging dir to get any of those benefits |
Now all place use spark's job id, I can do this after this pr since it's not the same thing.
Since if two job write to same table's different partition, the have same output path ${table_location}/temporary/0....
This is caused that spark still use FileOutputCommitter, still keep this, if we can rewrite a commit protocol, we can avoid this.
Yea, I will try this later, it's a very useful design and can reduce hdfs's pressure a lot. I need to check this with our hdfs team too. |
Having checked how to use mainfest commit protocol, but I found a problem is that In current Spark's code, dynamicPartitionOverwrite can't support this, it means we can't use your feature in case of dynamic partition overwriting . We need to do some change to support this. WDYT cc @steveloughran @HyukjinKwon @cloud-fan @viirya |
|
that dynamic partition worked overlapped with the committer extension work and the s3a committer. It broke the merge; those lines you've found show the workaround. A key problem with the spark code is that it assumes file rename is a good way to commit work. AFAIK, it doesn't assume that directory renames are atomic, but unless file renames work fast then performance is going to be unsatisfactory. And on S3, file rename is O(data), so applications which use it to promote work (hello hive!) really suffer. Is there anything written up on this commit protocol I could look at to see what could be done? t the very least we could have the known committer implementations support StreamCapabilities.hasCapability() with some capabilities for the FS we could indirectly ask for related to rename (fast file rename, fast dir rename, atomic dir rename), which would let spark know what was actually viable at all. but those are really fs capabilities, you can't really expect the committer itself to know what the fs does, except in the case of the s3a committer, which is hard coded to one fs, whose semantics are known (though amplidata and netapp s3 devices do have fast file copy/rename even there...)
|
Here https://github.com/apache/spark/pull/35319/files is the whole plan. |
|
@steveloughran Have seen your comment on #35319, I think we should do the change (#35319 (comment) and #35319 (comment) have done))in this pr as the first step. Ok? |
|
Ok |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
Hi, sadly this PR got closed (automatically due to inactivity). We'd be interested in this feature & config option since the ".spark-staging-*" folders are causing trouble e.g. when using hive-partitioned tables in BigQuery (via external table) or BigTable (same issue). AFAIS there is no setting in Bigquery & bigtable to ignore the folders starting with "." or some other pattern. We have dynamic partitioning on an inbetween level, which generates the .spark-staging folders in the hive-partition path. E.g. partitioning would regularly look like Obviously this could be also seen as a problem of BigQuery & BigTable, but having the staging dir configurable in Spark and move it outside of the table would definitely help here. Any chance this PR could be revived? |
What changes were proposed in this pull request?
Consider such cases:
Why are the changes needed?
Make spark data source insert's stagingDir can be customized and then we can do more optimize base on this.
Does this PR introduce any user-facing change?
User can define staging dir by
spark.exec.stagingDirHow was this patch tested?
Added UT