diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 7bc60405440544c2da1350f1bfb814107f44daa6..f75244a9ac8cf0553ad2aa361c924ff40e39c225 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend( } def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val runScript = new File(sparkHome, "run").getCanonicalPath - val driverUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) - val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend( .setValue(value) .build()) } - return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), + System.getProperty("spark.driver.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + val runScript = new File(sparkHome, "run").getCanonicalPath + command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = new File(uri).getName().split('.')(0) + command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } + return command.build() } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 75b8268b55baa8dc7d545d21be9b1ff0d5085614..51b780ac72b05546714b6028d6ecd02c26079608 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend( val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend( .setValue(value) .build()) } + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = new File(uri).getName().split('.')(0) + command.setValue("cd %s*; ./spark-executor".format(basename)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) .build() - val command = CommandInfo.newBuilder() - .setValue(execScript) - .setEnvironment(environment) - .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) diff --git a/make-distribution.sh b/make-distribution.sh index 4374e0e8c4e06d0f64364a341de06f08047b356b..0a8941c1f8515be8754576119cb465213d95319f 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/conf" "$DISTDIR" cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" +cp "$FWDIR/spark-executor" "$DISTDIR" if [ "$1" == "tgz" ]; then diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9920e00a674fec3666d7ee6c276dd8b7a36c4e80..a2ea9c9694c3d7e9bed8f8d6acd4470e286e2124 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -176,7 +176,7 @@ object SparkBuild extends Build { "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "net.liftweb" % "lift-json_2.9.2" % "2.5", - "org.apache.mesos" % "mesos" % "0.9.0-incubating", + "org.apache.mesos" % "mesos" % "0.12.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", "com.codahale.metrics" % "metrics-core" % "3.0.0", diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index 59f9d05683dd04601d6b1bcbb1722967ddefd3d0..0bfe7bb7437b61d1905208f97cb8c7a77a3f41f5 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -831,6 +831,8 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: var sparkContext: SparkContext = null def createSparkContext(): SparkContext = { + val uri = System.getenv("SPARK_EXECUTOR_URI") + if (uri != null) System.setProperty("spark.executor.uri", uri) val master = this.master match { case Some(m) => m case None => {