Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
6331e56
[36562][SQL] Add new NewSQLHadoopMapReduceCommitProtocol resolve conf…
AngersZhuuuu Aug 24, 2021
4d80c24
Update PartitionedWriteSuite.scala
AngersZhuuuu Aug 24, 2021
b692e0f
Update PartitionedWriteSuite.scala
AngersZhuuuu Aug 24, 2021
e6dbae1
Update NewSQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Aug 24, 2021
6419573
Update NewSQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Aug 24, 2021
44d1d8f
Update PathOutputCommitProtocol.scala
AngersZhuuuu Aug 24, 2021
e2c5318
Update PathOutputCommitProtocol.scala
AngersZhuuuu Aug 24, 2021
9106d18
update
AngersZhuuuu Aug 24, 2021
2031f5b
[SPARK-36579][SQL] Make spark source stagingDir can use user defined
AngersZhuuuu Aug 25, 2021
c29f55e
Update SQLHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Aug 25, 2021
2604c9f
Update
AngersZhuuuu Aug 25, 2021
71f6b17
fix ut
AngersZhuuuu Aug 25, 2021
1947cbf
Merge branch 'master' into SPARK-36579
AngersZhuuuu Oct 12, 2021
30113d2
Update SaveAsHiveFile.scala
AngersZhuuuu Oct 12, 2021
6f405dc
update
AngersZhuuuu Oct 12, 2021
361263b
update
AngersZhuuuu Oct 12, 2021
9ee6ee5
update
AngersZhuuuu Oct 12, 2021
7773fb2
update
AngersZhuuuu Oct 12, 2021
a3b3c51
Update PathOutputCommitProtocol.scala
AngersZhuuuu Oct 12, 2021
b4d60e4
Update PathOutputCommitProtocol.scala
AngersZhuuuu Oct 12, 2021
2c41808
Update CommitterBindingSuite.scala
AngersZhuuuu Oct 13, 2021
454118d
update
AngersZhuuuu Oct 13, 2021
11d6d15
Update PartitionedWriteSuite.scala
AngersZhuuuu Oct 13, 2021
9c9826c
update
AngersZhuuuu Oct 13, 2021
5926822
update
AngersZhuuuu Oct 13, 2021
6cdee58
update
AngersZhuuuu Oct 13, 2021
824ec04
Update FileCommitProtocol.scala
AngersZhuuuu Oct 13, 2021
8c8a174
update
AngersZhuuuu Oct 13, 2021
8d7ce6e
update
AngersZhuuuu Oct 13, 2021
da6a0b9
update
AngersZhuuuu Oct 13, 2021
63e466c
update
AngersZhuuuu Oct 13, 2021
fd2ac5f
update
AngersZhuuuu Oct 13, 2021
e2951f1
fix UT
AngersZhuuuu Oct 14, 2021
c8d0c33
complicated
AngersZhuuuu Oct 14, 2021
eafb8dd
update
AngersZhuuuu Oct 14, 2021
ff2bfb8
revert API change
AngersZhuuuu Oct 14, 2021
c5a1d16
update
AngersZhuuuu Oct 14, 2021
239b2a8
Merge branch 'master' into SPARK-36562
AngersZhuuuu Oct 14, 2021
6dde222
Merge branch 'SPARK-36579' into SPARK-36562
AngersZhuuuu Oct 14, 2021
ac468fa
update
AngersZhuuuu Oct 14, 2021
09f211b
update
AngersZhuuuu Oct 14, 2021
f1f12c3
update
AngersZhuuuu Oct 14, 2021
023787a
Update PathOutputCommitProtocol.scala
AngersZhuuuu Oct 14, 2021
5984fb2
Update PathOutputCommitProtocol.scala
AngersZhuuuu Oct 14, 2021
c2f606f
trigger GA
AngersZhuuuu Oct 15, 2021
a4468c2
Merge branch 'master' into SPARK-36562
AngersZhuuuu Jan 25, 2022
b90caeb
Update FileCommitProtocol.scala
AngersZhuuuu Jan 25, 2022
98f066c
Update FileCommitProtocol.scala
AngersZhuuuu Jan 25, 2022
2ea214c
update
AngersZhuuuu Jan 25, 2022
201b0b5
Update HadoopMapReduceCommitProtocol.scala
AngersZhuuuu Jan 25, 2022
772bf6b
Update InsertIntoHadoopFsRelationCommand.scala
AngersZhuuuu Jan 25, 2022
866681c
Update InsertIntoHadoopFsRelationCommand.scala
AngersZhuuuu Jan 25, 2022
aaae0f2
update
AngersZhuuuu Jan 25, 2022
c838dd3
Update SQLPathHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Jan 25, 2022
1f2222c
Update StagingInsertSuite.scala
AngersZhuuuu Jan 25, 2022
c3472d9
Update SQLPathHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Jan 25, 2022
6b44163
Update SQLPathHadoopMapReduceCommitProtocol.scala
AngersZhuuuu Jan 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.internal.io

import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._

Expand Down Expand Up @@ -50,6 +55,22 @@ import org.apache.spark.util.Utils
abstract class FileCommitProtocol extends Logging {
import FileCommitProtocol._

/**
* Get the final directory where the result data will be placed once the job
* is committed. This may be null, in which case, there is no output
* path to write data to and won't write any data.
*/
def getOutputPath: Path = null

/**
* Get the directory that the task should write results into.
* Warning: there's no guarantee that this work path is on the same
* FS as the final output, or that it's visible across machines.
* May be null, in which case, there is no output path to write data to
* and won't write any data.
*/
def getWorkPath: Path = null

/**
* Setups up a job. Must be called on the driver before any other methods can be invoked.
*/
Expand Down Expand Up @@ -230,6 +251,89 @@ object FileCommitProtocol extends Logging {
def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}

def externalTempPath(
path: Path,
hadoopConf: Configuration,
stagingDir: String,
engineType: String,
jobId: String): Path = {
val extURI = path.toUri
if (extURI.getScheme == "viewfs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no test for this in the tests that i can see...it'd be good to have that viewfs coverage tested too.

getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir, engineType, jobId)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir, engineType, jobId),
"-ext-10000")
}
}

private def getExtTmpPathRelTo(
path: Path,
hadoopConf: Configuration,
stagingDir: String,
engineType: String,
jobId: String): Path = {
// Hive uses 10000
new Path(getStagingDir(path, hadoopConf, stagingDir, engineType, jobId), "-ext-10000")
}

private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,
stagingDir: String,
engineType: String,
jobId: String): Path = {
getStagingDir(
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
hadoopConf,
stagingDir,
engineType,
jobId)
}

def getStagingDir(
inputPath: Path,
hadoopConf: Configuration,
stagingDir: String,
engineType: String,
jobId: String): Path = {
val inputPathName: String = inputPath.toString
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}

// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
stagingPathName = new Path(inputPathName, ".hive-staging").toString
}

val dir = fs.makeQualified(
new Path(stagingPathName + "_" + executionId(engineType) + "-" + jobId))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
dir
}

private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
path1.startsWith(path2)
}

def executionId(engineType: String): String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
s"${engineType}_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class HadoopMapReduceCommitProtocol(
import FileCommitProtocol._

/** OutputCommitter from Hadoop is not serializable so marking it transient. */
@transient private var committer: OutputCommitter = _
@transient protected var committer: OutputCommitter = _

/**
* Checks whether there are files to be committed to a valid output location.
Expand Down Expand Up @@ -106,6 +106,16 @@ class HadoopMapReduceCommitProtocol(
*/
protected def stagingDir = getStagingDir(path, jobId)

override def getOutputPath: Path = {
if (dynamicPartitionOverwrite) {
stagingDir
} else {
new Path(path)
}
}

override def getWorkPath: Path = getOutputPath

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
// If OutputFormat is Configurable, we should set conf to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ class PathOutputCommitProtocol(
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
}

/** The committer created. */
@transient private var committer: PathOutputCommitter = _

require(dest != null, "Null destination specified")

private[cloud] val destination: String = dest
Expand Down Expand Up @@ -115,7 +112,7 @@ class PathOutputCommitProtocol(
logTrace(s"Committer $committer may not be tolerant of task commit failures")
}
}
committer
committer.asInstanceOf[PathOutputCommitter]
}

/**
Expand All @@ -131,7 +128,7 @@ class PathOutputCommitProtocol(
dir: Option[String],
spec: FileNameSpec): String = {

val workDir = committer.getWorkPath
val workDir = committer.asInstanceOf[PathOutputCommitter].getWorkPath
val parent = dir.map {
d => new Path(workDir, d)
}.getOrElse(workDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,17 @@ object SQLConf {
.createWithDefault(
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")

val EXEC_STAGING_DIR = buildConf("spark.sql.exec.stagingDir")
.doc("The staging directory of Spark job. Spark uses it to deal with files with " +
"absolute output path, or writing data into partitioned directory " +
"when dynamic partition overwrite mode is on. " +
"Default value means staging directory is under table path.")
.version("3.3.0")
.internal()
.stringConf
.checkValue(!_.isEmpty, "Should not pass an empty string as staging diretory.")
.createWithDefault(".spark-staging")

val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
.doc("The maximum number of paths allowed for listing files at driver side. If the number " +
Expand Down Expand Up @@ -3966,6 +3977,8 @@ class SQLConf extends Serializable with Logging {

def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)

def stagingDir: String = getConf(SQLConf.EXEC_STAGING_DIR)

def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,14 @@ case class InsertIntoHadoopFsRelationCommand(
}
}

// For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files
// will be renamed from staging path to final output path during commit job
val committerOutputPath = if (dynamicPartitionOverwrite) {
FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
qualifiedOutputPath
}

val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
plan = child,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
committerOutputPath.toString, customPartitionLocations, outputColumns),
committer.getOutputPath.toString, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -36,6 +37,10 @@ class SQLHadoopMapReduceCommitProtocol(
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
with Serializable with Logging {

override val stagingDir: Path =
FileCommitProtocol.externalTempPath(new Path(path), SparkHadoopUtil.get.conf,
SQLConf.get.stagingDir, "spark", jobId)

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = super.setupCommitter(context)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}

import org.apache.spark.internal.io.FileNameSpec

/**
* A variant of [[SQLHadoopMapReduceCommitProtocol]] that allows specifying the actual
* Hadoop output committer using an option specified in SQLConf.
*/
class SQLPathHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {

// This variable only can be used after setupCommitter.
private lazy val sqlPathOutputCommitter: SQLPathOutputCommitter =
committer.asInstanceOf[SQLPathOutputCommitter]

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val committer = new SQLPathOutputCommitter(stagingDir, new Path(path), context)
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
committer
}

override def newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
spec: FileNameSpec): String = {
val filename = getFilename(taskContext, spec)
dir.map { d =>
new Path(new Path(
sqlPathOutputCommitter.getTaskAttemptPath(taskContext), d), filename).toString
}.getOrElse {
new Path(sqlPathOutputCommitter.getTaskAttemptPath(taskContext), filename).toString
}
}
}
Loading