From f6f46455eb4f1199eb9a464b1a0b45d9926f7ff8 Mon Sep 17 00:00:00 2001
From: Benjamin Hindman <benjamin.hindman@gmail.com>
Date: Tue, 23 Jul 2013 09:33:13 -0400
Subject: [PATCH] Added property 'spark.executor.uri' for launching on Mesos
 without requiring Spark to be installed. Using 'make_distribution.sh' a user
 can put a Spark distribution at a URI supported by Mesos (e.g., 'hdfs://...')
 and then set that when launching their job. Also added SPARK_EXECUTOR_URI for
 the REPL.

---
 .../mesos/CoarseMesosSchedulerBackend.scala   | 27 ++++++++++++++-----
 .../mesos/MesosSchedulerBackend.scala         | 17 ++++++++----
 make-distribution.sh                          |  1 +
 project/SparkBuild.scala                      |  2 +-
 .../main/scala/spark/repl/SparkILoop.scala    |  2 ++
 5 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 7bc6040544..f75244a9ac 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   def createCommand(offer: Offer, numCores: Int): CommandInfo = {
-    val runScript = new File(sparkHome, "run").getCanonicalPath
-    val driverUrl = "akka://spark@%s:%s/user/%s".format(
-      System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
-      StandaloneSchedulerBackend.ACTOR_NAME)
-    val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
-      runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
     val environment = Environment.newBuilder()
     sc.executorEnvs.foreach { case (key, value) =>
       environment.addVariables(Environment.Variable.newBuilder()
@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
         .setValue(value)
         .build())
     }
-    return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val driverUrl = "akka://spark@%s:%s/user/%s".format(
+      System.getProperty("spark.driver.host"),
+      System.getProperty("spark.driver.port"),
+      StandaloneSchedulerBackend.ACTOR_NAME)
+    val uri = System.getProperty("spark.executor.uri")
+    if (uri == null) {
+      val runScript = new File(sparkHome, "run").getCanonicalPath
+      command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+        runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = new File(uri).getName().split('.')(0)
+      command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+        basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+    }
+    return command.build()
   }
 
   override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 75b8268b55..51b780ac72 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
     val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
       "Spark home is not set; set it through the spark.home system " +
       "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-    val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
     val environment = Environment.newBuilder()
     sc.executorEnvs.foreach { case (key, value) =>
       environment.addVariables(Environment.Variable.newBuilder()
@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
         .setValue(value)
         .build())
     }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val uri = System.getProperty("spark.executor.uri")
+    if (uri == null) {
+      command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = new File(uri).getName().split('.')(0)
+      command.setValue("cd %s*; ./spark-executor".format(basename))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+    }
     val memory = Resource.newBuilder()
       .setName("mem")
       .setType(Value.Type.SCALAR)
       .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
       .build()
-    val command = CommandInfo.newBuilder()
-      .setValue(execScript)
-      .setEnvironment(environment)
-      .build()
     ExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
diff --git a/make-distribution.sh b/make-distribution.sh
index 4374e0e8c4..0a8941c1f8 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
 cp -r "$FWDIR/bin" "$DISTDIR"
 cp -r "$FWDIR/conf" "$DISTDIR"
 cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
+cp "$FWDIR/spark-executor" "$DISTDIR"
 
 
 if [ "$1" == "tgz" ]; then
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9920e00a67..a2ea9c9694 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -176,7 +176,7 @@ object SparkBuild extends Build {
       "it.unimi.dsi" % "fastutil" % "6.4.4",
       "colt" % "colt" % "1.2.0",
       "net.liftweb" % "lift-json_2.9.2" % "2.5",
-      "org.apache.mesos" % "mesos" % "0.9.0-incubating",
+      "org.apache.mesos" % "mesos" % "0.12.0-incubating",
       "io.netty" % "netty-all" % "4.0.0.Beta2",
       "org.apache.derby" % "derby" % "10.4.2.0" % "test",
       "com.codahale.metrics" % "metrics-core" % "3.0.0",
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index 59f9d05683..0bfe7bb743 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -831,6 +831,8 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
   var sparkContext: SparkContext = null
 
   def createSparkContext(): SparkContext = {
+    val uri = System.getenv("SPARK_EXECUTOR_URI")
+    if (uri != null) System.setProperty("spark.executor.uri", uri)
     val master = this.master match {
       case Some(m) => m
       case None => {
-- 
GitLab