Skip to content
Snippets Groups Projects
Commit 612a9fee authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #428 from woggling/mesos-exec-id

Make ExecutorIDs include SlaveIDs when running Mesos
parents dfb721b9 252845d3
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor) ...@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties) executor.initialize(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
properties
)
} }
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
......
...@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Long, String] val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks // An ExecutorInfo for our tasks
var executorInfo: ExecutorInfo = null var execArgs: Array[Byte] = null
override def start() { override def start() {
synchronized { synchronized {
...@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend( ...@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend(
} }
}.start() }.start()
executorInfo = createExecutorInfo()
waitForRegister() waitForRegister()
} }
} }
def createExecutorInfo(): ExecutorInfo = { def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " + "Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor")) "property, the SPARK_HOME environment variable or the SparkContext constructor"))
...@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment) .setEnvironment(environment)
.build() .build()
ExecutorInfo.newBuilder() ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build()) .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command) .setCommand(command)
.setData(ByteString.copyFrom(createExecArg())) .setData(ByteString.copyFrom(createExecArg()))
.addResources(memory) .addResources(memory)
...@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend( ...@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend(
* containing all the spark.* system properties in the form of (String, String) pairs. * containing all the spark.* system properties in the form of (String, String) pairs.
*/ */
private def createExecArg(): Array[Byte] = { private def createExecArg(): Array[Byte] = {
val props = new HashMap[String, String] if (execArgs == null) {
val iterator = System.getProperties.entrySet.iterator val props = new HashMap[String, String]
while (iterator.hasNext) { val iterator = System.getProperties.entrySet.iterator
val entry = iterator.next while (iterator.hasNext) {
val (key, value) = (entry.getKey.toString, entry.getValue.toString) val entry = iterator.next
if (key.startsWith("spark.")) { val (key, value) = (entry.getKey.toString, entry.getValue.toString)
props(key) = value if (key.startsWith("spark.")) {
props(key) = value
}
} }
// Serialize the map as an array of (String, String) pairs
execArgs = Utils.serialize(props.toArray)
} }
// Serialize the map as an array of (String, String) pairs return execArgs
return Utils.serialize(props.toArray)
} }
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
...@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend(
return MesosTaskInfo.newBuilder() return MesosTaskInfo.newBuilder()
.setTaskId(taskId) .setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(executorInfo) .setExecutor(createExecutorInfo(slaveId))
.setName(task.name) .setName(task.name)
.addResources(cpuResource) .addResources(cpuResource)
.setData(ByteString.copyFrom(task.serializedTask)) .setData(ByteString.copyFrom(task.serializedTask))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment