Skip to content

Commit 77be7e8

Browse files
committed
address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice.
1 parent 1dcadf9 commit 77be7e8

File tree

7 files changed

+24
-29
lines changed

7 files changed

+24
-29
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,17 @@ import java.io._
2121
import java.net.URI
2222
import java.util.{Properties, UUID}
2323
import java.util.concurrent.atomic.AtomicInteger
24-
2524
import scala.collection.{Map, Set}
2625
import scala.collection.generic.Growable
2726
import scala.collection.mutable.{ArrayBuffer, HashMap}
2827
import scala.reflect.{ClassTag, classTag}
29-
3028
import org.apache.hadoop.conf.Configuration
3129
import org.apache.hadoop.fs.Path
3230
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
3331
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
3432
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3533
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3634
import org.apache.mesos.MesosNativeLibrary
37-
3835
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
3936
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4037
import org.apache.spark.rdd._
@@ -45,6 +42,7 @@ import org.apache.spark.scheduler.local.LocalBackend
4542
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
4643
import org.apache.spark.ui.SparkUI
4744
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
45+
import java.util.Random
4846

4947
/**
5048
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -127,6 +125,11 @@ class SparkContext(
127125

128126
val master = conf.get("spark.master")
129127
val appName = conf.get("spark.app.name")
128+
129+
// Generate the random name for a temp folder in Tachyon
130+
// Add a timestamp as the suffix here to make it more safe
131+
val tachyonFolderName = new Random().nextInt() + "_" + System.currentTimeMillis()
132+
conf.set("spark.tachyonstore.foldername", tachyonFolderName)
130133

131134
val isLocal = (master == "local" || master.startsWith("local["))
132135

@@ -139,8 +142,7 @@ class SparkContext(
139142
conf.get("spark.driver.host"),
140143
conf.get("spark.driver.port").toInt,
141144
isDriver = true,
142-
isLocal = isLocal,
143-
"<driver>" + appName)
145+
isLocal = isLocal)
144146
SparkEnv.set(env)
145147

146148
// Used to store a URL for each static file/jar together with the file's local timestamp

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import org.apache.spark.util.{AkkaUtils, Utils}
4040
*/
4141
class SparkEnv private[spark] (
4242
val executorId: String,
43-
val appId: String,
4443
val actorSystem: ActorSystem,
4544
val serializerManager: SerializerManager,
4645
val serializer: Serializer,
@@ -122,8 +121,7 @@ object SparkEnv extends Logging {
122121
hostname: String,
123122
port: Int,
124123
isDriver: Boolean,
125-
isLocal: Boolean,
126-
appId: String = null): SparkEnv = {
124+
isLocal: Boolean): SparkEnv = {
127125

128126
val securityManager = new SecurityManager(conf)
129127
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
@@ -171,7 +169,7 @@ object SparkEnv extends Logging {
171169
"BlockManagerMaster",
172170
new BlockManagerMasterActor(isLocal, conf)), conf)
173171
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
174-
serializer, conf, securityManager, appId)
172+
serializer, conf, securityManager)
175173

176174
val connectionManager = blockManager.connectionManager
177175

@@ -221,7 +219,6 @@ object SparkEnv extends Logging {
221219

222220
new SparkEnv(
223221
executorId,
224-
appId,
225222
actorSystem,
226223
serializerManager,
227224
serializer,

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ private[spark] class ExecutorRunner(
9292
def substituteVariables(argument: String): String = argument match {
9393
case "{{WORKER_URL}}" => workerUrl
9494
case "{{EXECUTOR_ID}}" => execId.toString
95-
case "{{APP_ID}}" => appId.toString
9695
case "{{HOSTNAME}}" => host
9796
case "{{CORES}}" => cores.toString
9897
case other => other

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.spark.util.{AkkaUtils, Utils}
3131
private[spark] class CoarseGrainedExecutorBackend(
3232
driverUrl: String,
3333
executorId: String,
34-
appId: String,
3534
hostPort: String,
3635
cores: Int)
3736
extends Actor
@@ -55,7 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5554
logInfo("Successfully registered with driver")
5655
// Make this host instead of hostPort ?
5756
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
58-
false, appId)
57+
false)
5958

6059
case RegisterExecutorFailed(message) =>
6160
logError("Slave registration failed: " + message)
@@ -94,7 +93,7 @@ private[spark] class CoarseGrainedExecutorBackend(
9493
}
9594

9695
private[spark] object CoarseGrainedExecutorBackend {
97-
def run(driverUrl: String, appId: String, executorId: String, hostname: String, cores: Int,
96+
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
9897
workerUrl: Option[String]) {
9998
// Debug code
10099
Utils.checkHost(hostname)
@@ -107,7 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend {
107106
// set it
108107
val sparkHostPort = hostname + ":" + boundPort
109108
actorSystem.actorOf(
110-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, appId, executorId,
109+
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
111110
sparkHostPort, cores),
112111
name = "Executor")
113112
workerUrl.foreach{ url =>
@@ -121,13 +120,13 @@ private[spark] object CoarseGrainedExecutorBackend {
121120
case x if x < 4 =>
122121
System.err.println(
123122
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
124-
"Usage: CoarseGrainedExecutorBackend <driverUrl> <appId> <executorId> <hostname> " +
123+
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
125124
"<cores> [<workerUrl>]")
126125
System.exit(1)
127126
case 4 =>
128-
run(args(0), args(1), args(2), args(3), args(4).toInt, None)
127+
run(args(0), args(1), args(2), args(3).toInt, None)
129128
case x if x > 4 =>
130-
run(args(0), args(1), args(2), args(3), args(4).toInt, Some(args(5)))
129+
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
131130
}
132131
}
133132
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ private[spark] class Executor(
3838
executorId: String,
3939
slaveHostname: String,
4040
properties: Seq[(String, String)],
41-
isLocal: Boolean = false,
42-
appId: String = null)
41+
isLocal: Boolean = false)
4342
extends Logging
4443
{
4544
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
@@ -104,7 +103,7 @@ private[spark] class Executor(
104103
private val env = {
105104
if (!isLocal) {
106105
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
107-
isDriver = false, isLocal = false, appId)
106+
isDriver = false, isLocal = false)
108107
SparkEnv.set(_env)
109108
_env.metricsSystem.registerSource(executorSource)
110109
_env

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ private[spark] class SparkDeploySchedulerBackend(
4545
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
4646
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
4747
CoarseGrainedSchedulerBackend.ACTOR_NAME)
48-
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{APP_ID}}", "{{HOSTNAME}}",
49-
"{{CORES}}", "{{WORKER_URL}}")
48+
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
5049
val command = Command(
5150
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
5251
val sparkHome = sc.getSparkHome()

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ private[spark] class BlockManager(
4848
val defaultSerializer: Serializer,
4949
maxMemory: Long,
5050
val conf: SparkConf,
51-
securityManager: SecurityManager,
52-
appId: String = "test")
51+
securityManager: SecurityManager)
5352
extends Logging {
5453

5554
val shuffleBlockManager = new ShuffleBlockManager(this)
@@ -63,8 +62,9 @@ private[spark] class BlockManager(
6362
var tachyonInitialized = false
6463
private[storage] lazy val tachyonStore : TachyonStore = {
6564
val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir"))
66-
val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}"
67-
val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998")
65+
val appFolderName = conf.get("spark.tachyonstore.foldername")
66+
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
67+
val tachyonMaster = conf.get("spark.tachyonmaster.address", "tachyon://localhost:19998")
6868
val tachyonBlockManager = new TachyonBlockManager(
6969
shuffleBlockManager, tachyonStorePath, tachyonMaster)
7070
tachyonInitialized = true
@@ -134,9 +134,9 @@ private[spark] class BlockManager(
134134
* Construct a BlockManager with a memory limit set based on system properties.
135135
*/
136136
def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
137-
serializer: Serializer, conf: SparkConf, securityManager: SecurityManager, appId: String) = {
137+
serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = {
138138
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf,
139-
securityManager, appId)
139+
securityManager)
140140
}
141141

142142
/**

0 commit comments

Comments
 (0)