Skip to content
Snippets Groups Projects
Commit 2b3c4614 authored by CodingCat's avatar CodingCat
Browse files

refactor sparkHome to val

clean code
parent 29f4b6a2
No related branches found
No related tags found
No related merge requests found
...@@ -76,7 +76,7 @@ private[spark] class Worker( ...@@ -76,7 +76,7 @@ private[spark] class Worker(
@volatile var registered = false @volatile var registered = false
@volatile var connected = false @volatile var connected = false
val workerId = generateWorkerId() val workerId = generateWorkerId()
var sparkHome: File = null val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
var workDir: File = null var workDir: File = null
val executors = new HashMap[String, ExecutorRunner] val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner]
...@@ -120,7 +120,6 @@ private[spark] class Worker( ...@@ -120,7 +120,6 @@ private[spark] class Worker(
assert(!registered) assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory))) host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome) logInfo("Spark home: " + sparkHome)
createWorkDir() createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
...@@ -211,7 +210,8 @@ private[spark] class Worker( ...@@ -211,7 +210,8 @@ private[spark] class Worker(
try { try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)), self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, ExecutorState.RUNNING) workDir, akkaUrl, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager executors(appId + "/" + execId) = manager
manager.start() manager.start()
...@@ -225,6 +225,7 @@ private[spark] class Worker( ...@@ -225,6 +225,7 @@ private[spark] class Worker(
logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name)) logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) { if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill() executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
} }
masterLock.synchronized { masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment