diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 22e1d52f65d3fafa6504ce29d6e4ad82e94137bc..4a6abf20b0a0b52faa22fbad52d9305ca88dd6da 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -41,7 +41,7 @@ import spark.scheduler.ShuffleMapTask
 import spark.scheduler.DAGScheduler
 import spark.scheduler.TaskScheduler
 import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.{SchedulerBackend, ClusterScheduler}
+import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.BlockManagerMaster
 
@@ -57,7 +57,7 @@ class SparkContext(
 
   // Set Spark master host and port system properties
   if (System.getProperty("spark.master.host") == null) {
-    System.setProperty("spark.master.host", Utils.localIpAddress)
+    System.setProperty("spark.master.host", Utils.localIpAddress())
   }
   if (System.getProperty("spark.master.port") == null) {
     System.setProperty("spark.master.port", "0")
@@ -80,13 +80,25 @@ class SparkContext(
     val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
     // Regular expression for local[N, maxRetries], used in tests with failing tasks
     val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+    // Regular expression for connecting to Spark deploy clusters
+    val SPARK_REGEX = """(spark://.*)""".r
+
     master match {
       case "local" => 
         new LocalScheduler(1, 0)
+
       case LOCAL_N_REGEX(threads) => 
         new LocalScheduler(threads.toInt, 0)
+
       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
         new LocalScheduler(threads.toInt, maxFailures.toInt)
+
+      case SPARK_REGEX(sparkUrl) =>
+        val scheduler = new ClusterScheduler(this)
+        val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
+        scheduler.initialize(backend)
+        scheduler
+
       case _ =>
         MesosNativeLibrary.load()
         val scheduler = new ClusterScheduler(this)
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index e8502f0b8f37bc329e459591926836793f7196d9..31d48b82b990aa05117ffd3d1e3900e646e1c155 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -8,8 +8,9 @@ import scala.collection.mutable
 class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
   var state = JobState.WAITING
   var executors = new mutable.HashMap[Int, ExecutorInfo]
+  var coresGranted = 0
 
-  var nextExecutorId = 0
+  private var nextExecutorId = 0
 
   def newExecutorId(): Int = {
     val id = nextExecutorId
@@ -17,9 +18,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
     id
   }
 
-  def newExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
+  def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
     val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
     executors(exec.id) = exec
+    coresGranted += cores
     exec
   }
+
+  def removeExecutor(exec: ExecutorInfo) {
+    executors -= exec.id
+    coresGranted -= exec.cores
+  }
+
+  def coresLeft: Int = desc.cores - coresGranted
 }
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 89de3b182718573615dbfa977d1adb591da0f658..d691613b0d855c3e7e420bbb5b0b3d3546ecd066 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -83,7 +83,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
           if (ExecutorState.isFinished(state)) {
             // Remove this executor from the worker and job
             logInfo("Removing executor " + exec.fullId + " because it is " + state)
-            idToJob(jobId).executors -= exec.id
+            idToJob(jobId).removeExecutor(exec)
             exec.worker.removeExecutor(exec)
             // TODO: the worker would probably want to restart the executor a few times
             schedule()
@@ -119,26 +119,19 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
    * every time a new job joins or resource availability changes.
    */
   def schedule() {
-    // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs
-    // in order of submission time and launching the first one that fits in the cluster.
-    // It's also not very efficient in terms of algorithmic complexity.
-    for (job <- waitingJobs.clone()) {
-      logInfo("Trying to schedule job " + job.id)
-      // Figure out how many cores the job could use on the whole cluster
-      val jobMemory = job.desc.memoryPerSlave
-      val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum
-      logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores)
-      if (usableCores >= job.desc.cores) {
-        // We can launch it! Let's just partition the workers into executors for this job.
-        // TODO: Probably want to spread stuff out across nodes more.
-        var coresLeft = job.desc.cores
-        for (worker <- workers if worker.memoryFree >= jobMemory && coresLeft > 0) {
-          val coresToUse = math.min(worker.coresFree, coresLeft)
-          val exec = job.newExecutor(worker, coresToUse)
+    // Right now this is a very simple FIFO scheduler. We keep looking through the jobs
+    // in order of submission time and launching the first one that fits on each node.
+    for (worker <- workers if worker.coresFree > 0) {
+      for (job <- waitingJobs.clone()) {
+        val jobMemory = job.desc.memoryPerSlave
+        if (worker.memoryFree >= jobMemory) {
+          val coresToUse = math.min(worker.coresFree, job.coresLeft)
+          val exec = job.addExecutor(worker, coresToUse)
           launchExecutor(worker, exec)
-          coresLeft -= coresToUse
         }
-        waitingJobs -= job
+        if (job.coresLeft == 0) {
+          waitingJobs -= job
+        }
       }
     }
   }
@@ -188,6 +181,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
     actorToJob -= job.actor
     addressToWorker -= job.actor.path.address
     completedJobs += job   // Remember it in our history
+    waitingJobs -= job
     for (exec <- job.executors.values) {
       exec.worker.removeExecutor(exec)
       exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorManager.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
similarity index 84%
rename from core/src/main/scala/spark/deploy/worker/ExecutorManager.scala
rename to core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index ce177996486e7118c722339312454bc3b89d0d48..ecd558546b5db238bef42280133dd34386dd5426 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorManager.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -13,13 +13,15 @@ import spark.deploy.ExecutorStateChanged
 /**
  * Manages the execution of one executor process.
  */
-class ExecutorManager(
+class ExecutorRunner(
     jobId: String,
     execId: Int,
     jobDesc: JobDescription,
     cores: Int,
     memory: Int,
     worker: ActorRef,
+    workerId: String,
+    hostname: String,
     sparkHome: File,
     workDir: File)
   extends Logging {
@@ -29,17 +31,22 @@ class ExecutorManager(
   var process: Process = null
 
   def start() {
-    workerThread = new Thread("ExecutorManager for " + fullId) {
+    workerThread = new Thread("ExecutorRunner for " + fullId) {
       override def run() { fetchAndRunExecutor() }
     }
     workerThread.start()
   }
 
-  /** Stop this executor manager, including killing the process it launched */
+  /** Stop this executor runner, including killing the process it launched */
   def kill() {
     if (workerThread != null) {
       workerThread.interrupt()
       workerThread = null
+      if (process != null) {
+        logInfo("Killing process!")
+        process.destroy()
+      }
+      worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
     }
   }
 
@@ -75,10 +82,18 @@ class ExecutorManager(
     }
   }
 
+  /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
+  def substituteVariables(argument: String): String = argument match {
+    case "{{SLAVEID}}" => workerId
+    case "{{HOSTNAME}}" => hostname
+    case "{{CORES}}" => cores.toString
+    case other => other
+  }
+
   def buildCommandSeq(): Seq[String] = {
     val command = jobDesc.command
     val runScript = new File(sparkHome, "run").getCanonicalPath
-    Seq(runScript, command.mainClass) ++ command.arguments
+    Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
   }
 
   /** Spawn a thread that will redirect a given stream to a file */
@@ -130,11 +145,7 @@ class ExecutorManager(
       worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
     } catch {
       case interrupted: InterruptedException =>
-        logInfo("Runner thread interrupted -- killing executor " + fullId)
-        if (process != null) {
-          process.destroy()
-        }
-        worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
+        logInfo("Runner thread for executor " + fullId + " interrupted")
 
       case e: Exception => {
         logError("Error running executor", e)
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index fba44ca9b5b6e031d48e59ff6cf0ea431bab8abe..19ffc1e40132959ec69db58eba4c9fd030ff9314 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -26,7 +26,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
   val workerId = generateWorkerId()
   var sparkHome: File = null
   var workDir: File = null
-  val executors = new HashMap[String, ExecutorManager]
+  val executors = new HashMap[String, ExecutorRunner]
   val finishedExecutors = new ArrayBuffer[String]
 
   var coresUsed = 0
@@ -104,8 +104,8 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
 
     case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_) =>
       logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name))
-      val manager = new ExecutorManager(
-        jobId, execId, jobDesc, cores_, memory_, self, sparkHome, workDir)
+      val manager = new ExecutorRunner(
+        jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, sparkHome, workDir)
       executors(jobId + "/" + execId) = manager
       manager.start()
       master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
@@ -118,6 +118,13 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
         finishedExecutors += jobId + "/" + execId
       }
 
+    case KillExecutor(jobId, execId) =>
+      val fullId = jobId + "/" + execId
+      logInfo("Asked to kill executor " + fullId)
+      executors(jobId + "/" + execId).kill()
+      executors -= fullId
+      finishedExecutors += fullId
+
     case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
       masterDisconnected()
   }
@@ -126,6 +133,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
     // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
     // (Note that if reconnecting we would also need to assign IDs differently.)
     logError("Connection to master failed! Shutting down.")
+    executors.values.foreach(_.kill())
     System.exit(1)
   }
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0bd2d154793ec145d836d29cea5e83a24a8bbce6
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -0,0 +1,83 @@
+package spark.scheduler.cluster
+
+import spark.{Utils, Logging, SparkContext}
+import spark.deploy.client.{Client, ClientListener}
+import spark.deploy.{Command, JobDescription}
+import scala.collection.mutable.HashMap
+
+class SparkDeploySchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    master: String,
+    jobName: String)
+  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+  with ClientListener
+  with Logging {
+
+  var client: Client = null
+  var stopping = false
+
+  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+  // Environment variables to pass to our executors
+  val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
+    "SPARK_MEM",
+    "SPARK_CLASSPATH",
+    "SPARK_LIBRARY_PATH",
+    "SPARK_JAVA_OPTS"
+  )
+
+  // Memory used by each executor (in megabytes)
+  val executorMemory = {
+    if (System.getenv("SPARK_MEM") != null) {
+      Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
+      // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+    } else {
+      512
+    }
+  }
+
+  override def start() {
+    super.start()
+
+    val environment = new HashMap[String, String]
+    for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
+      if (System.getenv(key) != null) {
+        environment(key) = System.getenv(key)
+      }
+    }
+    val masterUrl = "akka://spark@%s:%s/user/%s".format(
+      System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+      StandaloneSchedulerBackend.ACTOR_NAME)
+    val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
+    val command = Command("spark.executor.StandaloneExecutorBackend", args, environment)
+    val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
+
+    client = new Client(sc.env.actorSystem, master, jobDesc, this)
+    client.start()
+  }
+
+  override def stop() {
+    stopping = true;
+    super.stop()
+    client.stop()
+  }
+
+  def connected(jobId: String) {
+    logInfo("Connected to Spark cluster with job ID " + jobId)
+  }
+
+  def disconnected() {
+    if (!stopping) {
+      logError("Disconnected from Spark cluster!")
+      scheduler.error("Disconnected from Spark cluster")
+    }
+  }
+
+  def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
+    logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
+       id, host, cores, Utils.memoryMegabytesToString(memory)))
+  }
+
+  def executorRemoved(id: String, message: String) {}
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 040cd6b33506f7ae3bd87bd4ff9b92c36eba3fd9..62a0c5589c5bf04b04c71b34a7f11d8a494935a4 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -42,7 +42,7 @@ class CoarseMesosSchedulerBackend(
   )
 
   // Memory used by each executor (in megabytes)
-  val EXECUTOR_MEMORY = {
+  val executorMemory = {
     if (System.getenv("SPARK_MEM") != null) {
       Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
       // TODO: Might need to add some extra memory for the non-heap parts of the JVM
@@ -160,7 +160,7 @@ class CoarseMesosSchedulerBackend(
         val slaveId = offer.getSlaveId.toString
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores && mem >= EXECUTOR_MEMORY && cpus >= 1 &&
+        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
             !slaveIdsWithExecutors.contains(slaveId)) {
           // Launch an executor on the slave
           val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
@@ -171,7 +171,7 @@ class CoarseMesosSchedulerBackend(
             .setCommand(createCommand(offer, cpusToUse))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", EXECUTOR_MEMORY))
+            .addResources(createResource("mem", executorMemory))
             .build()
           d.launchTasks(offer.getId, Collections.singletonList(task), filters)
         } else {
diff --git a/run b/run
index 5ba94b32434faeba2bd3d6d800ace4792cbde4c8..d386892b95cfd7790d792bad15c4f9ec65d45eb7 100755
--- a/run
+++ b/run
@@ -13,15 +13,21 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
   . $FWDIR/conf/spark-env.sh
 fi
 
+# Check that SCALA_HOME has been specified
+if [ -z "$SCALA_HOME" ]; then
+  echo "SCALA_HOME is not set" >&2
+  exit 1
+fi
+
 # If the user specifies a Mesos JAR, put it before our included one on the classpath
 MESOS_CLASSPATH=""
-if [ "x$MESOS_JAR" != "x" ] ; then
+if [ -z "$MESOS_JAR" ] ; then
   MESOS_CLASSPATH="$MESOS_JAR"
 fi
 
 # Figure out how much memory to use per executor and set it as an environment
 # variable so that our process sees it and can report it to Mesos
-if [ "x$SPARK_MEM" == "x" ] ; then
+if [ -z "$SPARK_MEM" ] ; then
   SPARK_MEM="512m"
 fi
 export SPARK_MEM
@@ -61,13 +67,26 @@ done
 for jar in `find $REPL_DIR/lib -name '*jar'`; do
   CLASSPATH+=":$jar"
 done
-CLASSPATH+=:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes
+CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
 export CLASSPATH # Needed for spark-shell
 
-if [ -n "$SCALA_HOME" ]; then
-  SCALA="${SCALA_HOME}/bin/scala"
+# Figure out whether to run our class with java or with the scala launcher.
+# In most cases, we'd prefer to execute our process with java because scala
+# creates a shell script as the parent of its Java process, which makes it
+# hard to kill the child with stuff like Process.destroy(). However, for
+# the Spark shell, the wrapper is necessary to properly reset the terminal
+# when we exit, so we allow it to set a variable to launch with scala.
+if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
+  RUNNER="${SCALA_HOME}/bin/scala"
 else
-  SCALA=scala
+  CLASSPATH+=":$SCALA_HOME/lib/scala-library.jar"
+  CLASSPATH+=":$SCALA_HOME/lib/scala-compiler.jar"
+  CLASSPATH+=":$SCALA_HOME/lib/jline.jar"
+  if [ -n "$JAVA_HOME" ]; then
+    RUNNER="${JAVA_HOME}/bin/java"
+  else
+    RUNNER=java
+  fi
 fi
 
-exec "$SCALA" -cp "$CLASSPATH" "$@"
+exec "$RUNNER" -cp "$CLASSPATH" "$@"
diff --git a/spark-shell b/spark-shell
index 29e5e65da2521dec9d034f5cf0c447a24dfc4c06..574ae2104d8bc65fb80c07792b39b7e98aca4868 100755
--- a/spark-shell
+++ b/spark-shell
@@ -1,3 +1,4 @@
 #!/bin/sh
 FWDIR="`dirname $0`"
+export SPARK_LAUNCH_WITH_SCALA=1
 exec $FWDIR/run spark.repl.Main "$@"