diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 1ef88075ad1e27f5b3ef6ae0e8ab0165c81d524e..818d6d1dda7f9774c60abfbf6ee0dff7730983a2 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor)
     logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
     this.driver = driver
     val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
-    executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
+    executor.initialize(
+      executorInfo.getExecutorId.getValue,
+      slaveInfo.getHostname,
+      properties
+    )
   }
 
   override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index f3467db86b6eeeaca53dc8481618958cb17ebcda..eab1c60e0b2f98c37f40af584218ab81c4c35f2d 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend(
   val taskIdToSlaveId = new HashMap[Long, String]
 
   // An ExecutorInfo for our tasks
-  var executorInfo: ExecutorInfo = null
+  var execArgs: Array[Byte] = null
 
   override def start() {
     synchronized {
@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend(
         }
       }.start()
 
-      executorInfo = createExecutorInfo()
       waitForRegister()
     }
   }
 
-  def createExecutorInfo(): ExecutorInfo = {
+  def createExecutorInfo(execId: String): ExecutorInfo = {
     val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
       "Spark home is not set; set it through the spark.home system " +
       "property, the SPARK_HOME environment variable or the SparkContext constructor"))
@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend(
       .setEnvironment(environment)
       .build()
     ExecutorInfo.newBuilder()
-      .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
       .addResources(memory)
@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend(
    * containing all the spark.* system properties in the form of (String, String) pairs.
    */
   private def createExecArg(): Array[Byte] = {
-    val props = new HashMap[String, String]
-    val iterator = System.getProperties.entrySet.iterator
-    while (iterator.hasNext) {
-      val entry = iterator.next
-      val (key, value) = (entry.getKey.toString, entry.getValue.toString)
-      if (key.startsWith("spark.")) {
-        props(key) = value
+    if (execArgs == null) {
+      val props = new HashMap[String, String]
+      val iterator = System.getProperties.entrySet.iterator
+      while (iterator.hasNext) {
+        val entry = iterator.next
+        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+        if (key.startsWith("spark.")) {
+          props(key) = value
+        }
       }
+      // Serialize the map as an array of (String, String) pairs
+      execArgs = Utils.serialize(props.toArray)
     }
-    // Serialize the map as an array of (String, String) pairs
-    return Utils.serialize(props.toArray)
+    return execArgs
   }
 
   override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend(
     return MesosTaskInfo.newBuilder()
       .setTaskId(taskId)
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(executorInfo)
+      .setExecutor(createExecutorInfo(slaveId))
       .setName(task.name)
       .addResources(cpuResource)
       .setData(ByteString.copyFrom(task.serializedTask))