Skip to content
Snippets Groups Projects
Commit fced7885 authored by Thomas Graves's avatar Thomas Graves
Browse files

fix yarn-client

parent c6de982b
No related branches found
No related tags found
No related merge requests found
...@@ -47,9 +47,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte ...@@ -47,9 +47,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
private var driverClosed:Boolean = false private var driverClosed:Boolean = false
private var amClient: AMRMClient[ContainerRequest] = _ private var amClient: AMRMClient[ContainerRequest] = _
private val sparkConf = new SparkConf
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = new SparkConf)._1 conf = sparkConf)._1
var actor: ActorRef = _ var actor: ActorRef = _
// This actor just working as a monitor to watch on Driver Actor. // This actor just working as a monitor to watch on Driver Actor.
...@@ -137,8 +138,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte ...@@ -137,8 +138,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100) Thread.sleep(100)
} }
} }
conf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.host", driverHost)
conf.set("spark.driver.port", driverPort.toString) sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
...@@ -159,7 +160,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte ...@@ -159,7 +160,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
appAttemptId, appAttemptId,
args, args,
preferredNodeLocationData, preferredNodeLocationData,
new SparkConf) sparkConf)
logInfo("Allocating " + args.numWorkers + " workers.") logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished // Wait until all containers have finished
......
...@@ -46,9 +46,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte ...@@ -46,9 +46,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
private var yarnAllocator: YarnAllocationHandler = null private var yarnAllocator: YarnAllocationHandler = null
private var driverClosed:Boolean = false private var driverClosed:Boolean = false
private val sparkConf = new SparkConf
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = new SparkConf)._1 conf = sparkConf)._1
var actor: ActorRef = null var actor: ActorRef = null
// This actor just working as a monitor to watch on Driver Actor. // This actor just working as a monitor to watch on Driver Actor.
...@@ -163,8 +164,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte ...@@ -163,8 +164,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
Thread.sleep(100) Thread.sleep(100)
} }
} }
conf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.host", driverHost)
conf.set("spark.driver.port", driverPort.toString) sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
...@@ -180,7 +181,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte ...@@ -180,7 +181,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
scala.collection.immutable.Map() scala.collection.immutable.Map()
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
args, preferredNodeLocationData, new SparkConf) args, preferredNodeLocationData, sparkConf)
logInfo("Allocating " + args.numWorkers + " workers.") logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished // Wait until all containers have finished
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment