Skip to content
Snippets Groups Projects
Commit bcdde331 authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Move from master to driver

parent ad80f68e
No related branches found
No related tags found
No related merge requests found
......@@ -76,7 +76,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Start the user's JAR
userThread = startUserClass()
// This a bit hacky, but we need to wait until the spark.master.port property has
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
......@@ -124,19 +124,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
private def waitForSparkMaster() {
logInfo("Waiting for spark master to be reachable.")
var masterUp = false
while(!masterUp) {
val masterHost = System.getProperty("spark.master.host")
val masterPort = System.getProperty("spark.master.port")
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
while(!driverUp) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
val socket = new Socket(masterHost, masterPort.toInt)
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
logInfo("Master now available: " + masterHost + ":" + masterPort)
masterUp = true
logInfo("Master now available: " + driverHost + ":" + driverPort)
driverUp = true
} catch {
case e: Exception =>
logError("Failed to connect to master at " + masterHost + ":" + masterPort)
logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
Thread.sleep(100)
}
}
......
......@@ -191,8 +191,8 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else {
// deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
val masterUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
......@@ -209,7 +209,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
}
new Thread(
new WorkerRunnable(container, conf, masterUrl, workerId,
new WorkerRunnable(container, conf, driverUrl, workerId,
workerHostname, workerMemory, workerCores)
).start()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment