diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 4d4848b1bd8f8de3df10c6f978f93d87ecdf8acf..0000000000000000000000000000000000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.net.Socket
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.{SignalLogger, Utils}
-
-/**
- * An application master that runs the users driver program and allocates executors.
- */
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
-                        sparkConf: SparkConf) extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
-    this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private val rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = _
-  private var appAttemptId: ApplicationAttemptId = _
-  private var userThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  private val fs = FileSystem.get(yarnConf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var isFinished: Boolean = false
-  private var uiAddress: String = _
-  private var uiHistoryAddress: String = _
-  private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
-    YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
-  private var isLastAMRetry: Boolean = true
-
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
-  private var registered = false
-
-  def run() {
-    // set the web ui port to be ephemeral for yarn so we don't conflict with
-    // other spark processes running on the same box
-    System.setProperty("spark.ui.port", "0")
-
-    // when running the AM, the Spark master is always "yarn-cluster"
-    System.setProperty("spark.master", "yarn-cluster")
-
-    // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
-    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
-    appAttemptId = getApplicationAttemptId()
-    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
-    resourceManager = registerWithResourceManager()
-
-    // setup AmIpFilter for the SparkUI - do this before we start the UI
-    addAmIpFilter()
-
-    ApplicationMaster.register(this)
-
-    // Call this to force generation of secret so it gets populated into the
-    // hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the executor containers.
-    val securityMgr = new SecurityManager(sparkConf)
-
-    // Start the user's JAR
-    userThread = startUserClass()
-
-    // This a bit hacky, but we need to wait until the spark.driver.port property has
-    // been set by the Thread executing the user class.
-    waitForSparkContextInitialized()
-
-    // Do this after spark master is up and SparkContext is created so that we can register UI Url
-    synchronized {
-      if (!isFinished) {
-        registerApplicationMaster()
-        registered = true
-      }
-    }
-
-    // Allocate all containers
-    allocateExecutors()
-
-    // Wait for the user class to Finish
-    userThread.join()
-
-    System.exit(0)
-  }
-
-  // add the yarn amIpFilter that Yarn requires for properly securing the UI
-  private def addAmIpFilter() {
-    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    System.setProperty("spark.ui.filters", amFilter)
-    val proxy = YarnConfiguration.getProxyHostAndPort(conf)
-    val parts : Array[String] = proxy.split(":")
-    val uriBase = "http://" + proxy +
-      System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
-    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-    System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
-      params)
-  }
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
-  private def registerWithResourceManager(): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has some
-    // sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    appMasterRequest.setTrackingUrl(uiAddress)
-    resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-  private def startUserClass(): Thread = {
-    logInfo("Starting the user JAR in a separate Thread")
-    System.setProperty("spark.executor.instances", args.numExecutors.toString)
-    val mainMethod = Class.forName(
-      args.userClass,
-      false /* initialize */ ,
-      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
-    val t = new Thread {
-      override def run() {
-
-        var successed = false
-        try {
-          // Copy
-          var mainArgs: Array[String] = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
-          // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
-          // userThread will stop here unless it has uncaught exception thrown out
-          // It need shutdown hook to set SUCCEEDED
-          successed = true
-        } finally {
-          logDebug("finishing main")
-          isLastAMRetry = true
-          if (successed) {
-            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-          } else {
-            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
-          }
-        }
-      }
-    }
-    t.start()
-    t
-  }
-
-  // this need to happen before allocateExecutors
-  private def waitForSparkContextInitialized() {
-    logInfo("Waiting for spark context initialization")
-    try {
-      var sparkContext: SparkContext = null
-      ApplicationMaster.sparkContextRef.synchronized {
-        var count = 0
-        val waitTime = 10000L
-        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
-            && !isFinished) {
-          logInfo("Waiting for spark context initialization ... " + count)
-          count = count + 1
-          ApplicationMaster.sparkContextRef.wait(waitTime)
-        }
-        sparkContext = ApplicationMaster.sparkContextRef.get()
-        assert(sparkContext != null || count >= numTries)
-
-        if (null != sparkContext) {
-          uiAddress = sparkContext.ui.appUIHostPort
-          uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(
-            yarnConf,
-            resourceManager,
-            appAttemptId,
-            args,
-            sparkContext.preferredNodeLocationData,
-            sparkContext.getConf)
-        } else {
-          logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
-            format(count * waitTime, numTries))
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(
-            yarnConf,
-            resourceManager,
-            appAttemptId,
-            args,
-            sparkContext.getConf)
-        }
-      }
-    }
-  }
-
-  private def allocateExecutors() {
-    try {
-      logInfo("Allocating " + args.numExecutors + " executors.")
-      // Wait until all containers have finished
-      // TODO: This is a bit ugly. Can we make it nicer?
-      // TODO: Handle container failure
-
-      // Exits the loop if the user thread exits.
-      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
-          && !isFinished) {
-        checkNumExecutorsFailed()
-        yarnAllocator.allocateContainers(
-          math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
-        Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
-      }
-    }
-    logInfo("All executors have launched.")
-
-    // Launch a progress reporter thread, else the app will get killed after expiration
-    // (def: 10mins) timeout.
-    // TODO(harvey): Verify the timeout
-    if (userThread.isAlive) {
-      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-      val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-      // we want to be reasonably responsive without causing too many requests to RM.
-      val schedulerInterval =
-        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-      // must be <= timeoutInterval / 2.
-      val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-      launchReporterThread(interval)
-    }
-  }
-
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (userThread.isAlive && !isFinished) {
-          checkNumExecutorsFailed()
-          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
-          if (missingExecutorCount > 0) {
-            logInfo("Allocating %d containers to make up for (potentially) lost containers".
-              format(missingExecutorCount))
-            yarnAllocator.allocateContainers(missingExecutorCount)
-          } else {
-            sendProgress()
-          }
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // Setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  private def checkNumExecutorsFailed() {
-    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-      logInfo("max number of executor failures reached")
-      finishApplicationMaster(FinalApplicationStatus.FAILED,
-        "max number of executor failures reached")
-      // make sure to stop the user thread
-      val sparkContext = ApplicationMaster.sparkContextRef.get()
-      if (sparkContext != null) {
-        logInfo("Invoking sc stop from checkNumExecutorsFailed")
-        sparkContext.stop()
-      } else {
-        logError("sparkContext is null when should shutdown")
-      }
-    }
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // Simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateContainers(0)
-  }
-
-  /*
-  def printContainers(containers: List[Container]) = {
-    for (container <- containers) {
-      logInfo("Launching shell command on a new container."
-        + ", containerId=" + container.getId()
-        + ", containerNode=" + container.getNodeId().getHost()
-        + ":" + container.getNodeId().getPort()
-        + ", containerNodeURI=" + container.getNodeHttpAddress()
-        + ", containerState" + container.getState()
-        + ", containerResourceMemory"
-        + container.getResource().getMemory())
-    }
-  }
-  */
-
-  def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
-    synchronized {
-      if (isFinished) {
-        return
-      }
-      isFinished = true
-
-      logInfo("finishApplicationMaster with " + status)
-      if (registered) {
-        val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-          .asInstanceOf[FinishApplicationMasterRequest]
-        finishReq.setAppAttemptId(appAttemptId)
-        finishReq.setFinishApplicationStatus(status)
-        finishReq.setDiagnostics(diagnostics)
-        finishReq.setTrackingUrl(uiHistoryAddress)
-        resourceManager.finishApplicationMaster(finishReq)
-      }
-    }
-  }
-
-  /**
-   * Clean up the staging directory.
-   */
-  private def cleanupStagingDir() {
-    var stagingDirPath: Path = null
-    try {
-      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
-      if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
-        if (stagingDirPath == null) {
-          logError("Staging directory is null")
-          return
-        }
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
-      }
-    } catch {
-      case ioe: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
-    }
-  }
-
-  // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
-  class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
-    def run() {
-      logInfo("AppMaster received a signal.")
-      // we need to clean up staging dir before HDFS is shut down
-      // make sure we don't delete it until this is the last AM
-      if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
-    }
-  }
-
-}
-
-object ApplicationMaster extends Logging {
-  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
-  // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
-
-  private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
-  def register(master: ApplicationMaster) {
-    applicationMasters.add(master)
-  }
-
-  val sparkContextRef: AtomicReference[SparkContext] =
-    new AtomicReference[SparkContext](null /* initialValue */)
-
-  def sparkContextInitialized(sc: SparkContext): Boolean = {
-    var modified = false
-    sparkContextRef.synchronized {
-      modified = sparkContextRef.compareAndSet(null, sc)
-      sparkContextRef.notifyAll()
-    }
-
-    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
-    // System.exit.
-    // Should not really have to do this, but it helps YARN to evict resources earlier.
-    // Not to mention, prevent the Client from declaring failure even though we exited properly.
-    // Note that this will unfortunately not properly clean up the staging files because it gets
-    // called too late, after the filesystem is already shutdown.
-    if (modified) {
-      Runtime.getRuntime().addShutdownHook(new Thread with Logging {
-        // This is not only logs, but also ensures that log system is initialized for this instance
-        // when we are actually 'run'-ing.
-        logInfo("Adding shutdown hook for context " + sc)
-
-        override def run() {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-          // Best case ...
-          for (master <- applicationMasters) {
-            master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-          }
-        }
-      })
-    }
-
-    modified
-  }
-
-  def main(argStrings: Array[String]) {
-    SignalLogger.register(log)
-    val args = new ApplicationMasterArguments(argStrings)
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      new ApplicationMaster(args).run()
-    }
-  }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
deleted file mode 100644
index 155dd88aa2b81b0bdb78bbd2c5635309d6080062..0000000000000000000000000000000000000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import akka.actor._
-import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * An application master that allocates executors on behalf of a driver that is running outside
- * the cluster.
- *
- * This is used only in yarn-client mode.
- */
-class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
-  extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
-    this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private val rpc: YarnRPC = YarnRPC.create(conf)
-  private var resourceManager: AMRMProtocol = _
-  private var appAttemptId: ApplicationAttemptId = _
-  private var reporterThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-
-  private var driverClosed: Boolean = false
-  private var isFinished: Boolean = false
-  private var registered: Boolean = false
-
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
-  val securityManager = new SecurityManager(sparkConf)
-  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = sparkConf, securityManager = securityManager)._1
-  var actor: ActorRef = _
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established, thus we can
-      // monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-      case x: AddWebUIFilter =>
-        logInfo(s"Add WebUI Filter. $x")
-        driver ! x
-    }
-  }
-
-  def run() {
-    appAttemptId = getApplicationAttemptId()
-    resourceManager = registerWithResourceManager()
-
-    synchronized {
-      if (!isFinished) {
-        val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-        // Compute number of threads for akka
-        val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
-        if (minimumMemory > 0) {
-          val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-            YarnAllocationHandler.MEMORY_OVERHEAD)
-          val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
-          if (numCore > 0) {
-            // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
-            // TODO: Uncomment when hadoop is on a version which has this fixed.
-            // args.workerCores = numCore
-          }
-        }
-        registered = true
-      }
-    }
-    waitForSparkMaster()
-    addAmIpFilter()
-    // Allocate all containers
-    allocateExecutors()
-
-    // Launch a progress reporter thread, else app will get killed after expiration
-    // (def: 10mins) timeout ensure that progress is sent before
-    // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
-
-    // must be <= timeoutInterval / 2.
-    val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-    reporterThread = launchReporterThread(interval)
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
-  private def registerWithResourceManager(): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
-    logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(appAttemptId)
-    // Setting this to master host,port - so that the ApplicationReport at client has
-    // some sensible info. Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl(appUIAddress)
-    resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-  // add the yarn amIpFilter that Yarn requires for properly securing the UI
-  private def addAmIpFilter() {
-    val proxy = YarnConfiguration.getProxyHostAndPort(conf)
-    val parts = proxy.split(":")
-    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-    val uriBase = "http://" + proxy + proxyBase
-    val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-    val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Master now available: " + driverHost + ":" + driverPort)
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
-
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-      SparkEnv.driverActorSystemName,
-      driverHost,
-      driverPort.toString,
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-
-  private def allocateExecutors() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
-      args, preferredNodeLocationData, sparkConf)
-
-    logInfo("Allocating " + args.numExecutors + " executors.")
-    // Wait until all containers have finished
-    // TODO: This is a bit ugly. Can we make it nicer?
-    // TODO: Handle container failure
-    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
-        !isFinished) {
-      yarnAllocator.allocateContainers(
-        math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
-      checkNumExecutorsFailed()
-      Thread.sleep(100)
-    }
-
-    logInfo("All executors have launched.")
-  }
-  private def checkNumExecutorsFailed() {
-    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-      finishApplicationMaster(FinalApplicationStatus.FAILED,
-        "max number of executor failures reached")
-    }
-  }
-
-  // TODO: We might want to extend this to allocate more containers in case they die !
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed && !isFinished) {
-          checkNumExecutorsFailed()
-          val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
-          if (missingExecutorCount > 0) {
-            logInfo("Allocating " + missingExecutorCount +
-              " containers to make up for (potentially ?) lost containers")
-            yarnAllocator.allocateContainers(missingExecutorCount)
-          } else {
-            sendProgress()
-          }
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateContainers(0)
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
-    synchronized {
-      if (isFinished) {
-        return
-      }
-      logInfo("Unregistering ApplicationMaster with " + status)
-      if (registered) {
-        val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-          .asInstanceOf[FinishApplicationMasterRequest]
-        finishReq.setAppAttemptId(appAttemptId)
-        finishReq.setFinishApplicationStatus(status)
-        finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
-        finishReq.setDiagnostics(appMessage)
-        resourceManager.finishApplicationMaster(finishReq)
-      }
-      isFinished = true
-    }
-  }
-
-}
-
-
-object ExecutorLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      new ExecutorLauncher(args).run()
-    }
-  }
-}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 568a6ef932bbd7fb6fb6d70f35da9c1ec53afe52..629cd13f671455d2115617554063ab3066bfe924 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,33 +17,24 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
 import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
 import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.AMRMProtocol
 import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
 import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
 import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
-  type AllocationType = Value
-  val HOST, RACK, ANY = Value
-}
+import org.apache.hadoop.yarn.util.Records
 
 // TODO:
 // Too many params.
@@ -59,16 +50,14 @@ object AllocationType extends Enumeration {
  * Acquires resources for executors from a ResourceManager and launches executors in new containers.
  */
 private[yarn] class YarnAllocationHandler(
-    val conf: Configuration,
-    val resourceManager: AMRMProtocol,
-    val appAttemptId: ApplicationAttemptId,
-    val maxExecutors: Int,
-    val executorMemory: Int,
-    val executorCores: Int,
-    val preferredHostToCount: Map[String, Int],
-    val preferredRackToCount: Map[String, Int],
-    val sparkConf: SparkConf)
-  extends Logging {
+    conf: Configuration,
+    sparkConf: SparkConf,
+    resourceManager: AMRMProtocol,
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+  extends YarnAllocator with Logging {
+
   // These three are locked on allocatedHostToContainersMap. Complementary data structures
   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
   // allocatedContainerToHostMap: container to host mapping.
@@ -90,7 +79,7 @@ private[yarn] class YarnAllocationHandler(
 
   // Additional memory overhead - in mb.
   private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    YarnAllocationHandler.MEMORY_OVERHEAD)
+    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
 
   private val numExecutorsRunning = new AtomicInteger()
   // Used to generate a unique id per executor
@@ -98,6 +87,12 @@ private[yarn] class YarnAllocationHandler(
   private val lastResponseId = new AtomicInteger()
   private val numExecutorsFailed = new AtomicInteger()
 
+  private val maxExecutors = args.numExecutors
+  private val executorMemory = args.executorMemory
+  private val executorCores = args.executorCores
+  private val (preferredHostToCount, preferredRackToCount) =
+    generateNodeToWeight(conf, preferredNodes)
+
   def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -106,9 +101,10 @@ private[yarn] class YarnAllocationHandler(
     container.getResource.getMemory >= (executorMemory + memoryOverhead)
   }
 
-  def allocateContainers(executorsToRequest: Int) {
+  override def allocateResources() = {
     // We need to send the request only once from what I understand ... but for now, not modifying
     // this much.
+    val executorsToRequest = Math.max(maxExecutors - numExecutorsRunning.get(), 0)
 
     // Keep polling the Resource Manager for containers
     val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
@@ -182,7 +178,7 @@ private[yarn] class YarnAllocationHandler(
 
         // Now rack local
         if (remainingContainers != null){
-          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
 
           if (rack != null){
             val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
@@ -256,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
           // Should not be there, but ..
           pendingReleaseContainers.remove(containerId)
 
-          val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
           allocatedHostToContainersMap.synchronized {
             val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
               new HashSet[ContainerId]())
@@ -331,7 +327,7 @@ private[yarn] class YarnAllocationHandler(
             allocatedContainerToHostMap -= containerId
 
             // Doing this within locked context, sigh ... move to outside ?
-            val rack = YarnAllocationHandler.lookupRack(conf, host)
+            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
             if (rack != null) {
               val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
               if (rackCount > 0) {
@@ -364,9 +360,9 @@ private[yarn] class YarnAllocationHandler(
     for (container <- hostContainers) {
       val candidateHost = container.getHostName
       val candidateNumContainers = container.getNumContainers
-      assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
 
-      val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
       if (rack != null) {
         var count = rackToCounts.getOrElse(rack, 0)
         count += candidateNumContainers
@@ -378,7 +374,8 @@ private[yarn] class YarnAllocationHandler(
       new ArrayBuffer[ResourceRequest](rackToCounts.size)
     for ((rack, count) <- rackToCounts){
       requestedContainers +=
-        createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
+        createResourceRequest(AllocationType.RACK, rack, count,
+          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
     }
 
     requestedContainers.toList
@@ -409,7 +406,7 @@ private[yarn] class YarnAllocationHandler(
       logDebug("numExecutors: " + numExecutors + ", host preferences: " +
         preferredHostToCount.isEmpty)
       resourceRequests = List(createResourceRequest(
-        AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
+        AllocationType.ANY, null, numExecutors, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
     } else {
       // request for all hosts in preferred nodes and for numExecutors -
       // candidates.size, request by default allocation policy.
@@ -423,7 +420,7 @@ private[yarn] class YarnAllocationHandler(
             AllocationType.HOST,
             candidateHost,
             requiredCount,
-            YarnAllocationHandler.PRIORITY)
+            YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
         }
       }
       val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
@@ -433,7 +430,7 @@ private[yarn] class YarnAllocationHandler(
         AllocationType.ANY,
         resource = null,
         numExecutors,
-        YarnAllocationHandler.PRIORITY)
+        YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
 
       val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
         hostContainerRequests.size + rackContainerRequests.size + 1)
@@ -483,12 +480,12 @@ private[yarn] class YarnAllocationHandler(
     // There must be a third request - which is ANY : that will be specially handled.
     requestType match {
       case AllocationType.HOST => {
-        assert(YarnAllocationHandler.ANY_HOST != resource)
+        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
         val hostname = resource
         val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
 
         // Add to host->rack mapping
-        YarnAllocationHandler.populateRackInfo(conf, hostname)
+        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
 
         nodeLocal
       }
@@ -497,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
         createResourceRequestImpl(rack, numExecutors, priority)
       }
       case AllocationType.ANY => createResourceRequestImpl(
-        YarnAllocationHandler.ANY_HOST, numExecutors, priority)
+        YarnSparkHadoopUtil.ANY_HOST, numExecutors, priority)
       case _ => throw new IllegalArgumentException(
         "Unexpected/unsupported request type: " + requestType)
     }
@@ -541,90 +538,6 @@ private[yarn] class YarnAllocationHandler(
 
     retval
   }
-}
-
-object YarnAllocationHandler {
-
-  val ANY_HOST = "*"
-  // All requests are issued with same priority : we do not (yet) have any distinction between
-  // request types (like map/reduce in hadoop for example)
-  val PRIORITY = 1
-
-  // Additional memory overhead - in mb
-  val MEMORY_OVERHEAD = 384
-
-  // Host to rack map - saved from allocation requests
-  // We are expecting this not to change.
-  // Note that it is possible for this to change : and RM will indicate that to us via update
-  // response to allocate. But we are punting on handling that for now.
-  private val hostToRack = new ConcurrentHashMap[String, String]()
-  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
-  def newAllocator(
-    conf: Configuration,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    sparkConf: SparkConf): YarnAllocationHandler = {
-
-    new YarnAllocationHandler(
-      conf,
-      resourceManager,
-      appAttemptId,
-      args.numExecutors,
-      args.executorMemory,
-      args.executorCores,
-      Map[String, Int](),
-      Map[String, Int](),
-      sparkConf)
-  }
-
-  def newAllocator(
-    conf: Configuration,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    map: collection.Map[String,
-    collection.Set[SplitInfo]],
-    sparkConf: SparkConf): YarnAllocationHandler = {
-
-    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-    new YarnAllocationHandler(
-      conf,
-      resourceManager,
-      appAttemptId,
-      args.numExecutors,
-      args.executorMemory,
-      args.executorCores,
-      hostToCount,
-      rackToCount,
-      sparkConf)
-  }
-
-  def newAllocator(
-    conf: Configuration,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    maxExecutors: Int,
-    executorMemory: Int,
-    executorCores: Int,
-    map: collection.Map[String, collection.Set[SplitInfo]],
-    sparkConf: SparkConf): YarnAllocationHandler = {
-
-    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
-    new YarnAllocationHandler(
-      conf,
-      resourceManager,
-      appAttemptId,
-      maxExecutors,
-      executorMemory,
-      executorCores,
-      hostToCount,
-      rackToCount,
-      sparkConf)
-  }
 
   // A simple method to copy the split info map.
   private def generateNodeToWeight(
@@ -642,7 +555,7 @@ object YarnAllocationHandler {
       val hostCount = hostToCount.getOrElse(host, 0)
       hostToCount.put(host, hostCount + splits.size)
 
-      val rack = lookupRack(conf, host)
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
       if (rack != null){
         val rackCount = rackToCount.getOrElse(host, 0)
         rackToCount.put(host, rackCount + splits.size)
@@ -652,41 +565,4 @@ object YarnAllocationHandler {
     (hostToCount.toMap, rackToCount.toMap)
   }
 
-  def lookupRack(conf: Configuration, host: String): String = {
-    if (!hostToRack.contains(host)) populateRackInfo(conf, host)
-    hostToRack.get(host)
-  }
-
-  def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
-    val set = rackToHostSet.get(rack)
-    if (set == null) return None
-
-    // No better way to get a Set[String] from JSet ?
-    val convertedSet: collection.mutable.Set[String] = set
-    Some(convertedSet.toSet)
-  }
-
-  def populateRackInfo(conf: Configuration, hostname: String) {
-    Utils.checkHost(hostname)
-
-    if (!hostToRack.containsKey(hostname)) {
-      // If there are repeated failures to resolve, all to an ignore list ?
-      val rackInfo = RackResolver.resolve(conf, hostname)
-      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
-        val rack = rackInfo.getNetworkLocation
-        hostToRack.put(hostname, rack)
-        if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack,
-            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
-        }
-        rackToHostSet.get(rack).add(hostname)
-
-        // TODO(harvey): Figure out this comment...
-        // Since RackResolver caches, we are disabling this for now ...
-      } /* else {
-        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
-        hostToRack.put(hostname, null)
-      } */
-    }
-  }
 }
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cc5392192ec519fbdd8aa79489ff4030e67e60af
--- /dev/null
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+/**
+ * YarnRMClient implementation for the Yarn alpha API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+  private var rpc: YarnRPC = null
+  private var resourceManager: AMRMProtocol = _
+  private var uiHistoryAddress: String = _
+
+  override def register(
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      preferredNodeLocations: Map[String, Set[SplitInfo]],
+      uiAddress: String,
+      uiHistoryAddress: String) = {
+    this.rpc = YarnRPC.create(conf)
+    this.uiHistoryAddress = uiHistoryAddress
+
+    resourceManager = registerWithResourceManager(conf)
+    registerApplicationMaster(uiAddress)
+
+    new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
+      preferredNodeLocations)
+  }
+
+  override def getAttemptId() = {
+    val envs = System.getenv()
+    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    appAttemptId
+  }
+
+  override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
+    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+      .asInstanceOf[FinishApplicationMasterRequest]
+    finishReq.setAppAttemptId(getAttemptId())
+    finishReq.setFinishApplicationStatus(status)
+    finishReq.setDiagnostics(diagnostics)
+    finishReq.setTrackingUrl(uiHistoryAddress)
+    resourceManager.finishApplicationMaster(finishReq)
+  }
+
+  override def getProxyHostAndPort(conf: YarnConfiguration) =
+    YarnConfiguration.getProxyHostAndPort(conf)
+
+  override def getMaxRegAttempts(conf: YarnConfiguration) =
+    conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+
+  private def registerWithResourceManager(conf: YarnConfiguration): AMRMProtocol = {
+    val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+    logInfo("Connecting to ResourceManager at " + rmAddress)
+    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+  }
+
+  private def registerApplicationMaster(uiAddress: String): RegisterApplicationMasterResponse = {
+    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+      .asInstanceOf[RegisterApplicationMasterRequest]
+    appMasterRequest.setApplicationAttemptId(getAttemptId())
+    // Setting this to master host,port - so that the ApplicationReport at client has some
+    // sensible info.
+    // Users can then monitor stderr/stdout on that node if required.
+    appMasterRequest.setHost(Utils.localHostName())
+    appMasterRequest.setRpcPort(0)
+    appMasterRequest.setTrackingUrl(uiAddress)
+    resourceManager.registerApplicationMaster(appMasterRequest)
+  }
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f15c813b83973c98503dba022da2df34a872f386
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.collection.JavaConversions._
+import scala.util.Try
+
+import akka.actor._
+import akka.remote._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+
+/**
+ * Common application master functionality for Spark on Yarn.
+ */
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
+  client: YarnRMClient) extends Logging {
+  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+  // optimal as more containers are available. Might need to handle this better.
+  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
+
+  private val sparkConf = new SparkConf()
+  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
+  private val isDriver = args.userClass != null
+
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+
+  @volatile private var finished = false
+  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+
+  private var reporterThread: Thread = _
+  private var allocator: YarnAllocator = _
+
+  // Fields used in client mode.
+  private var actorSystem: ActorSystem = null
+  private var actor: ActorRef = _
+
+  // Fields used in cluster mode.
+  private val sparkContextRef = new AtomicReference[SparkContext](null)
+
+  final def run(): Int = {
+    if (isDriver) {
+      // Set the web ui port to be ephemeral for yarn so we don't conflict with
+      // other spark processes running on the same box
+      System.setProperty("spark.ui.port", "0")
+
+      // Set the master property to match the requested mode.
+      System.setProperty("spark.master", "yarn-cluster")
+    }
+
+    logInfo("ApplicationAttemptId: " + client.getAttemptId())
+
+    val cleanupHook = new Runnable {
+      override def run() {
+        // If the SparkContext is still registered, shut it down as a best case effort in case
+        // users do not call sc.stop or do System.exit().
+        val sc = sparkContextRef.get()
+        if (sc != null) {
+          logInfo("Invoking sc stop from shutdown hook")
+          sc.stop()
+          finish(FinalApplicationStatus.SUCCEEDED)
+        }
+
+        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
+        // running the AM.
+        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+        if (finished || isLastAttempt) {
+          cleanupStagingDir()
+        }
+      }
+    }
+    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
+    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
+
+    // Call this to force generation of secret so it gets populated into the
+    // Hadoop UGI. This has to happen before the startUserClass which does a
+    // doAs in order for the credentials to be passed on to the executor containers.
+    val securityMgr = new SecurityManager(sparkConf)
+
+    if (isDriver) {
+      runDriver()
+    } else {
+      runExecutorLauncher(securityMgr)
+    }
+
+    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
+      finish(finalStatus)
+      0
+    } else {
+      1
+    }
+  }
+
+  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
+    if (!finished) {
+      logInfo(s"Finishing ApplicationMaster with $status"  +
+        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+      finished = true
+      finalStatus = status
+      try {
+        if (Thread.currentThread() != reporterThread) {
+          reporterThread.interrupt()
+          reporterThread.join()
+        }
+      } finally {
+        client.shutdown(status, Option(diagnostics).getOrElse(""))
+      }
+    }
+  }
+
+  private def sparkContextInitialized(sc: SparkContext) = {
+    sparkContextRef.synchronized {
+      sparkContextRef.compareAndSet(null, sc)
+      sparkContextRef.notifyAll()
+    }
+  }
+
+  private def sparkContextStopped(sc: SparkContext) = {
+    sparkContextRef.compareAndSet(sc, null)
+  }
+
+  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+    val sc = sparkContextRef.get()
+    allocator = client.register(yarnConf,
+      if (sc != null) sc.getConf else sparkConf,
+      if (sc != null) sc.preferredNodeLocationData else Map(),
+      uiAddress,
+      uiHistoryAddress)
+
+    allocator.allocateResources()
+    reporterThread = launchReporterThread()
+  }
+
+  private def runDriver(): Unit = {
+    addAmIpFilter()
+    val userThread = startUserClass()
+
+    // This a bit hacky, but we need to wait until the spark.driver.port property has
+    // been set by the Thread executing the user class.
+    val sc = waitForSparkContextInitialized()
+
+    // If there is no SparkContext at this point, just fail the app.
+    if (sc == null) {
+      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
+    } else {
+      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+      try {
+        userThread.join()
+      } finally {
+        // In cluster mode, ask the reporter thread to stop since the user app is finished.
+        reporterThread.interrupt()
+      }
+    }
+  }
+
+  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+      conf = sparkConf, securityManager = securityMgr)._1
+    actor = waitForSparkDriver()
+    addAmIpFilter()
+    registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
+      sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+
+    // In client mode the actor will stop the reporter thread.
+    reporterThread.join()
+    finalStatus = FinalApplicationStatus.SUCCEEDED
+  }
+
+  private def launchReporterThread(): Thread = {
+    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+    // we want to be reasonably responsive without causing too many requests to RM.
+    val schedulerInterval =
+      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+    // must be <= expiryInterval / 2.
+    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+
+    val t = new Thread {
+      override def run() {
+        while (!finished) {
+          checkNumExecutorsFailed()
+          if (!finished) {
+            logDebug("Sending progress")
+            allocator.allocateResources()
+            try {
+              Thread.sleep(interval)
+            } catch {
+              case e: InterruptedException =>
+            }
+          }
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.setName("Reporter")
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + interval)
+    t
+  }
+
+  /**
+   * Clean up the staging directory.
+   */
+  private def cleanupStagingDir() {
+    val fs = FileSystem.get(yarnConf)
+    var stagingDirPath: Path = null
+    try {
+      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+      if (!preserveFiles) {
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+        if (stagingDirPath == null) {
+          logError("Staging directory is null")
+          return
+        }
+        logInfo("Deleting staging directory " + stagingDirPath)
+        fs.delete(stagingDirPath, true)
+      }
+    } catch {
+      case ioe: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
+  private def waitForSparkContextInitialized(): SparkContext = {
+    logInfo("Waiting for spark context initialization")
+    try {
+      sparkContextRef.synchronized {
+        var count = 0
+        val waitTime = 10000L
+        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+        while (sparkContextRef.get() == null && count < numTries && !finished) {
+          logInfo("Waiting for spark context initialization ... " + count)
+          count = count + 1
+          sparkContextRef.wait(waitTime)
+        }
+
+        val sparkContext = sparkContextRef.get()
+        assert(sparkContext != null || count >= numTries)
+        if (sparkContext == null) {
+          logError(
+            "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format(
+              count * waitTime, numTries))
+        }
+        sparkContext
+      }
+    }
+  }
+
+  private def waitForSparkDriver(): ActorRef = {
+    logInfo("Waiting for Spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+    while (!driverUp) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
+          Thread.sleep(100)
+      }
+    }
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
+
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      driverHost,
+      driverPort.toString,
+      CoarseGrainedSchedulerBackend.ACTOR_NAME)
+    actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+  }
+
+  private def checkNumExecutorsFailed() = {
+    if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
+
+      val sc = sparkContextRef.get()
+      if (sc != null) {
+        logInfo("Invoking sc stop from checkNumExecutorsFailed")
+        sc.stop()
+      }
+    }
+  }
+
+  /** Add the Yarn IP filter that is required for properly securing the UI. */
+  private def addAmIpFilter() = {
+    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    val proxy = client.getProxyHostAndPort(yarnConf)
+    val parts = proxy.split(":")
+    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+    val uriBase = "http://" + proxy + proxyBase
+    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+
+    if (isDriver) {
+      System.setProperty("spark.ui.filters", amFilter)
+      System.setProperty(s"spark.$amFilter.params", params)
+    } else {
+      actor ! AddWebUIFilter(amFilter, params, proxyBase)
+    }
+  }
+
+  private def startUserClass(): Thread = {
+    logInfo("Starting the user JAR in a separate Thread")
+    System.setProperty("spark.executor.instances", args.numExecutors.toString)
+    val mainMethod = Class.forName(args.userClass, false,
+      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+
+    val t = new Thread {
+      override def run() {
+        var status = FinalApplicationStatus.FAILED
+        try {
+          // Copy
+          val mainArgs = new Array[String](args.userArgs.size)
+          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+          mainMethod.invoke(null, mainArgs)
+          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
+          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
+          status = FinalApplicationStatus.SUCCEEDED
+        } finally {
+          logDebug("Finishing main")
+        }
+        finalStatus = status
+      }
+    }
+    t.setName("Driver")
+    t.start()
+    t
+  }
+
+  // Actor used to monitor the driver when running in client deploy mode.
+  private class MonitorActor(driverUrl: String) extends Actor {
+
+    var driver: ActorSelection = _
+
+    override def preStart() = {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorSelection(driverUrl)
+      // Send a hello message to establish the connection, after which
+      // we can monitor Lifecycle Events.
+      driver ! "Hello"
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case x: DisassociatedEvent =>
+        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+        finish(FinalApplicationStatus.SUCCEEDED)
+      case x: AddWebUIFilter =>
+        logInfo(s"Add WebUI Filter. $x")
+        driver ! x
+    }
+
+  }
+
+}
+
+object ApplicationMaster extends Logging {
+
+  private var master: ApplicationMaster = _
+
+  def main(args: Array[String]) = {
+    SignalLogger.register(log)
+    val amArgs = new ApplicationMasterArguments(args)
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
+      System.exit(master.run())
+    }
+  }
+
+  private[spark] def sparkContextInitialized(sc: SparkContext) = {
+    master.sparkContextInitialized(sc)
+  }
+
+  private[spark] def sparkContextStopped(sc: SparkContext) = {
+    master.sparkContextStopped(sc)
+  }
+
+}
+
+/**
+ * This object does not provide any special functionality. It exists so that it's easy to tell
+ * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
+ */
+object ExecutorLauncher {
+
+  def main(args: Array[String]) = {
+    ApplicationMaster.main(args)
+  }
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 424b0fb0936f2d57b7dff7380169f7b5cb54bdc3..3e6b96fb63cea763c7db75d445659850269d3aaf 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -63,11 +63,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
           executorCores = value
           args = tail
 
-        case Nil =>
-          if (userJar == null || userClass == null) {
-            printUsageAndExit(1)
-          }
-
         case _ =>
           printUsageAndExit(1, args)
       }
@@ -80,16 +75,17 @@ class ApplicationMasterArguments(val args: Array[String]) {
     if (unknownParam != null) {
       System.err.println("Unknown/unsupported param " + unknownParam)
     }
-    System.err.println(
-      "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
-      "Options:\n" +
-      "  --jar JAR_PATH       Path to your application's JAR file (required)\n" +
-      "  --class CLASS_NAME   Name of your application's main class (required)\n" +
-      "  --args ARGS          Arguments to be passed to your application's main class.\n" +
-      "                       Mutliple invocations are possible, each will be passed in order.\n" +
-      "  --num-executors NUM    Number of executors to start (Default: 2)\n" +
-      "  --executor-cores NUM   Number of cores for the executors (Default: 1)\n" +
-      "  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
+    System.err.println("""
+      |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
+      |Options:
+      |  --jar JAR_PATH       Path to your application's JAR file
+      |  --class CLASS_NAME   Name of your application's main class
+      |  --args ARGS          Arguments to be passed to your application's main class.
+      |                       Mutliple invocations are possible, each will be passed in order.
+      |  --num-executors NUM    Number of executors to start (Default: 2)
+      |  --executor-cores NUM   Number of cores for the executors (Default: 1)
+      |  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+      """.stripMargin)
     System.exit(exitCode)
   }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index afa4fd4c6959e21385ff35a5e501d880ab984981..40d8d6d6e6961e9f36a5491377f3f7b2ff290d47 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -37,7 +37,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var numExecutors = 2
   var amQueue = sparkConf.get("QUEUE", "default")
   var amMemory: Int = 512 // MB
-  var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"
   var priority = 0
 
@@ -78,10 +77,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
           args = tail
 
         case ("--master-class" | "--am-class") :: value :: tail =>
-          if (args(0) == "--master-class") {
-            println("--master-class is deprecated. Use --am-class instead.")
-          }
-          amClass = value
+          println(s"${args(0)} is deprecated and is not used anymore.")
           args = tail
 
         case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
@@ -133,9 +129,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
           args = tail
 
         case Nil =>
-          if (userClass == null) {
-            throw new IllegalArgumentException(getUsageMessage())
-          }
 
         case _ =>
           throw new IllegalArgumentException(getUsageMessage(args))
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 3897b3a373a8cf29058c374b51daef929ffac594..6cf300c3986ad4dbf0c77d8e300a16d48c2987f0 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -42,12 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
 /**
  * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
  * Client submits an application to the YARN ResourceManager.
- *
- * Depending on the deployment mode this will launch one of two application master classes:
- * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
- *      which launches a driver program inside of the cluster.
- * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
- *      request executors on behalf of a driver running outside of the cluster.
  */
 trait ClientBase extends Logging {
   val args: ClientArguments
@@ -67,14 +61,11 @@ trait ClientBase extends Logging {
 
   // Additional memory overhead - in mb.
   protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
-    YarnAllocationHandler.MEMORY_OVERHEAD)
+    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
 
   // TODO(harvey): This could just go in ClientArguments.
   def validateArgs() = {
     Map(
-      ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
-          "Error: You must specify a user jar when running in standalone mode!"),
-      (args.userClass == null) -> "Error: You must specify a user class!",
       (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
       (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
         "greater than: " + memoryOverhead),
@@ -321,6 +312,8 @@ trait ClientBase extends Logging {
     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
     amContainer.setLocalResources(localResources)
 
+    val isLaunchingDriver = args.userClass != null
+
     // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
     // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
     // SparkContext will not let that set spark* system properties, which is expected behavior for
@@ -329,7 +322,7 @@ trait ClientBase extends Logging {
     // Note that to warn the user about the deprecation in cluster mode, some code from
     // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
     // described above).
-    if (args.amClass == classOf[ApplicationMaster].getName) {
+    if (isLaunchingDriver) {
       sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
         val warning =
           s"""
@@ -389,7 +382,7 @@ trait ClientBase extends Logging {
       javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
     }
 
-    if (args.amClass == classOf[ApplicationMaster].getName) {
+    if (isLaunchingDriver) {
       sparkConf.getOption("spark.driver.extraJavaOptions")
         .orElse(sys.env.get("SPARK_JAVA_OPTS"))
         .foreach(opts => javaOpts += opts)
@@ -397,22 +390,37 @@ trait ClientBase extends Logging {
         .foreach(p => javaOpts += s"-Djava.library.path=$p")
     }
 
-    // Command for the ApplicationMaster
-    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
-      javaOpts ++
-      Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass),
-        "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
-        userArgsToString(args),
-        "--executor-memory", args.executorMemory.toString,
+    val userClass =
+      if (args.userClass != null) {
+        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
+      } else {
+        Nil
+      }
+    val amClass =
+      if (isLaunchingDriver) {
+        classOf[ApplicationMaster].getName()
+      } else {
+        classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
+      }
+    val amArgs =
+      Seq(amClass) ++ userClass ++
+      (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
+      Seq("--executor-memory", args.executorMemory.toString,
         "--executor-cores", args.executorCores.toString,
         "--num-executors ", args.numExecutors.toString,
+        userArgsToString(args))
+
+    // Command for the ApplicationMaster
+    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+      javaOpts ++ amArgs ++
+      Seq(
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
         "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
 
     logInfo("Yarn AM launch context:")
-    logInfo(s"  class:   ${args.amClass}")
-    logInfo(s"  env:     $env")
-    logInfo(s"  command: ${commands.mkString(" ")}")
+    logInfo(s"  user class: ${args.userClass}")
+    logInfo(s"  env:        $env")
+    logInfo(s"  command:    ${commands.mkString(" ")}")
 
     // TODO: it would be nicer to just make sure there are no null commands here
     val printableCommands = commands.map(s => if (s == null) "null" else s).toList
@@ -623,7 +631,7 @@ object ClientBase extends Logging {
     YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
             File.pathSeparator)
 
-  /** 
+  /**
    * Get the list of namenodes the user may access.
    */
   private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cad94e5e19e1f28befe34295096ab06c3dc42c84
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+object AllocationType extends Enumeration {
+  type AllocationType = Value
+  val HOST, RACK, ANY = Value
+}
+
+/**
+ * Interface that defines a Yarn allocator.
+ */
+trait YarnAllocator {
+
+  def allocateResources(): Unit
+  def getNumExecutorsFailed: Int
+  def getNumExecutorsRunning: Int
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
new file mode 100644
index 0000000000000000000000000000000000000000..922d7d1a854a5c8f2cedc9d6581a78b3c80666c3
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.scheduler.SplitInfo
+
+/**
+ * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
+ * is used by Spark's AM.
+ */
+trait YarnRMClient {
+
+  /**
+   * Registers the application master with the RM.
+   *
+   * @param conf The Yarn configuration.
+   * @param sparkConf The Spark configuration.
+   * @param preferredNodeLocations Map with hints about where to allocate containers.
+   * @param uiAddress Address of the SparkUI.
+   * @param uiHistoryAddress Address of the application on the History Server.
+   */
+  def register(
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      preferredNodeLocations: Map[String, Set[SplitInfo]],
+      uiAddress: String,
+      uiHistoryAddress: String): YarnAllocator
+
+  /**
+   * Shuts down the AM. Guaranteed to only be called once.
+   *
+   * @param status The final status of the AM.
+   * @param diagnostics Diagnostics message to include in the final status.
+   */
+  def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+
+  /** Returns the attempt ID. */
+  def getAttemptId(): ApplicationAttemptId
+
+  /** Returns the RM's proxy host and port. */
+  def getProxyHostAndPort(conf: YarnConfiguration): String
+
+  /** Returns the maximum number of attempts to register the AM. */
+  def getMaxRegAttempts(conf: YarnConfiguration): Int
+
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 10aef5eb2486ff68556ba37f5c45038128740707..2aa27a19085828e14389c2d8ed382606104c71b4 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.lang.{Boolean => JBoolean}
+import java.util.{Collections, Set => JSet}
 import java.util.regex.Matcher
 import java.util.regex.Pattern
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.HashMap
 
@@ -29,11 +32,13 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.StringInterner
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
 
 /**
  * Contains util methods to interact with Hadoop from spark.
@@ -79,6 +84,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 }
 
 object YarnSparkHadoopUtil {
+  // Additional memory overhead - in mb.
+  val DEFAULT_MEMORY_OVERHEAD = 384
+
+  val ANY_HOST = "*"
+
+  // All RM requests are issued with same priority : we do not (yet) have any distinction between
+  // request types (like map/reduce in hadoop for example)
+  val RM_REQUEST_PRIORITY = 1
+
+  // Host to rack map - saved from allocation requests. We are expecting this not to change.
+  // Note that it is possible for this to change : and ResourceManager will indicate that to us via
+  // update response to allocate. But we are punting on handling that for now.
+  private val hostToRack = new ConcurrentHashMap[String, String]()
+  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
   def addToEnvironment(
       env: HashMap[String, String],
       variable: String,
@@ -173,4 +193,35 @@ object YarnSparkHadoopUtil {
     }
   }
 
+  private[spark] def lookupRack(conf: Configuration, host: String): String = {
+    if (!hostToRack.contains(host)) {
+      populateRackInfo(conf, host)
+    }
+    hostToRack.get(host)
+  }
+
+  private[spark] def populateRackInfo(conf: Configuration, hostname: String) {
+    Utils.checkHost(hostname)
+
+    if (!hostToRack.containsKey(hostname)) {
+      // If there are repeated failures to resolve, all to an ignore list.
+      val rackInfo = RackResolver.resolve(conf, hostname)
+      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+        val rack = rackInfo.getNetworkLocation
+        hostToRack.put(hostname, rack)
+        if (! rackToHostSet.containsKey(rack)) {
+          rackToHostSet.putIfAbsent(rack,
+            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+        }
+        rackToHostSet.get(rack).add(hostname)
+
+        // TODO(harvey): Figure out what this comment means...
+        // Since RackResolver caches, we are disabling this for now ...
+      } /* else {
+        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+        hostToRack.put(hostname, null)
+      } */
+    }
+  }
+
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 3474112ded5d71343bd6a55ff42d00bbfa7e76e6..d162b4c433f46a8529d021965fcd5f1cf576ad3f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -19,22 +19,21 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.spark._
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 
 /**
- *
- * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
+ * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
  */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration)
+  extends TaskSchedulerImpl(sc) {
 
   def this(sc: SparkContext) = this(sc, new Configuration())
 
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    val retval = YarnAllocationHandler.lookupRack(conf, host)
-    if (retval != null) Some(retval) else None
+    Option(YarnSparkHadoopUtil.lookupRack(conf, host))
   }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 833e249f9f612645400966abf9d5a98e4e50a1c9..a5f537dd9de307a690c3db679ef3074542513093 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 import scala.collection.mutable.ArrayBuffer
@@ -60,10 +60,7 @@ private[spark] class YarnClientSchedulerBackend(
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (
-      "--class", "notused",
-      "--jar", null, // The primary jar will be added dynamically in SparkContext.
-      "--args", hostport,
-      "--am-class", classOf[ExecutorLauncher].getName
+      "--args", hostport
     )
 
     // process any optional arguments, given either as environment variables
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 9aeca4a637d383296a1c84b0001bd212174aeca0..69f40225a21f544b2651f742e68d1f8eb6d7b404 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -18,16 +18,17 @@
 package org.apache.spark.scheduler.cluster
 
 import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 import org.apache.hadoop.conf.Configuration
 
 /**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
+ * ApplicationMaster, etc is done
  */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
+  extends TaskSchedulerImpl(sc) {
 
   logInfo("Created YarnClusterScheduler")
 
@@ -42,7 +43,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    val retval = YarnAllocationHandler.lookupRack(conf, host)
+    val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
     if (retval != null) Some(retval) else None
   }
 
@@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
     super.postStartHook()
     logInfo("YarnClusterScheduler.postStartHook done")
   }
+
+  override def stop() {
+    super.stop()
+    ApplicationMaster.sparkContextStopped(sc)
+  }
+
 }
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 1c4005fd8e78e712a557e5a7320e98e8b7cdbcf1..0000000000000000000000000000000000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.IOException
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.{SignalLogger, Utils}
-
-
-/**
- * An application master that runs the user's driver program and allocates executors.
- */
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
-                        sparkConf: SparkConf) extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
-    this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-  private var appAttemptId: ApplicationAttemptId = _
-  private var userThread: Thread = _
-  private val fs = FileSystem.get(yarnConf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var isFinished: Boolean = false
-  private var uiAddress: String = _
-  private var uiHistoryAddress: String = _
-  private val maxAppAttempts: Int = conf.getInt(
-    YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
-  private var isLastAMRetry: Boolean = true
-  private var amClient: AMRMClient[ContainerRequest] = _
-
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
-  private var registered = false
-
-  def run() {
-    // Set the web ui port to be ephemeral for yarn so we don't conflict with
-    // other spark processes running on the same box
-    System.setProperty("spark.ui.port", "0")
-
-    // When running the AM, the Spark master is always "yarn-cluster"
-    System.setProperty("spark.master", "yarn-cluster")
-
-    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
-    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
-
-    appAttemptId = ApplicationMaster.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
-    amClient = AMRMClient.createAMRMClient()
-    amClient.init(yarnConf)
-    amClient.start()
-
-    // setup AmIpFilter for the SparkUI - do this before we start the UI
-    addAmIpFilter()
-
-    ApplicationMaster.register(this)
-
-    // Call this to force generation of secret so it gets populated into the
-    // Hadoop UGI. This has to happen before the startUserClass which does a
-    // doAs in order for the credentials to be passed on to the executor containers.
-    val securityMgr = new SecurityManager(sparkConf)
-
-    // Start the user's JAR
-    userThread = startUserClass()
-
-    // This a bit hacky, but we need to wait until the spark.driver.port property has
-    // been set by the Thread executing the user class.
-    waitForSparkContextInitialized()
-
-    // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
-    synchronized {
-      if (!isFinished) {
-        registerApplicationMaster()
-        registered = true
-      }
-    }
-
-    // Allocate all containers
-    allocateExecutors()
-
-    // Launch thread that will heartbeat to the RM so it won't think the app has died.
-    launchReporterThread()
-
-    // Wait for the user class to finish
-    userThread.join()
-
-    System.exit(0)
-  }
-
-  // add the yarn amIpFilter that Yarn requires for properly securing the UI
-  private def addAmIpFilter() {
-    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    System.setProperty("spark.ui.filters", amFilter)
-    val proxy = WebAppUtils.getProxyHostAndPort(conf)
-    val parts : Array[String] = proxy.split(":")
-    val uriBase = "http://" + proxy +
-      System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-
-    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-    System.setProperty(
-      "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
-  }
-
-  private def startUserClass(): Thread = {
-    logInfo("Starting the user JAR in a separate Thread")
-    System.setProperty("spark.executor.instances", args.numExecutors.toString)
-    val mainMethod = Class.forName(
-      args.userClass,
-      false,
-      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
-    val t = new Thread {
-      override def run() {
-        var succeeded = false
-        try {
-          // Copy
-          val mainArgs = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
-          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
-          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
-          succeeded = true
-        } finally {
-          logDebug("Finishing main")
-          isLastAMRetry = true
-          if (succeeded) {
-            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-          } else {
-            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
-          }
-        }
-      }
-    }
-    t.setName("Driver")
-    t.start()
-    t
-  }
-
-  // This needs to happen before allocateExecutors()
-  private def waitForSparkContextInitialized() {
-    logInfo("Waiting for Spark context initialization")
-    try {
-      var sparkContext: SparkContext = null
-      ApplicationMaster.sparkContextRef.synchronized {
-        var numTries = 0
-        val waitTime = 10000L
-        val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
-            && !isFinished) {
-          logInfo("Waiting for Spark context initialization ... " + numTries)
-          numTries = numTries + 1
-          ApplicationMaster.sparkContextRef.wait(waitTime)
-        }
-        sparkContext = ApplicationMaster.sparkContextRef.get()
-        assert(sparkContext != null || numTries >= maxNumTries)
-
-        if (sparkContext != null) {
-          uiAddress = sparkContext.ui.appUIHostPort
-          uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(
-            yarnConf,
-            amClient,
-            appAttemptId,
-            args,
-            sparkContext.preferredNodeLocationData,
-            sparkContext.getConf)
-        } else {
-          logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d".
-            format(numTries * waitTime, maxNumTries))
-          this.yarnAllocator = YarnAllocationHandler.newAllocator(
-            yarnConf,
-            amClient,
-            appAttemptId,
-            args,
-            sparkContext.getConf)
-        }
-      }
-    }
-  }
-
-  private def allocateExecutors() {
-    try {
-      logInfo("Requesting" + args.numExecutors + " executors.")
-      // Wait until all containers have launched
-      yarnAllocator.addResourceRequests(args.numExecutors)
-      yarnAllocator.allocateResources()
-      // Exits the loop if the user thread exits.
-
-      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
-          && !isFinished) {
-        checkNumExecutorsFailed()
-        allocateMissingExecutor()
-        yarnAllocator.allocateResources()
-        Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
-      }
-    }
-    logInfo("All executors have launched.")
-  }
-
-  private def allocateMissingExecutor() {
-    val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
-      yarnAllocator.getNumPendingAllocate
-    if (missingExecutorCount > 0) {
-      logInfo("Allocating %d containers to make up for (potentially) lost containers".
-        format(missingExecutorCount))
-      yarnAllocator.addResourceRequests(missingExecutorCount)
-    }
-  }
-
-  private def checkNumExecutorsFailed() {
-    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-      logInfo("max number of executor failures reached")
-      finishApplicationMaster(FinalApplicationStatus.FAILED,
-        "max number of executor failures reached")
-      // make sure to stop the user thread
-      val sparkContext = ApplicationMaster.sparkContextRef.get()
-      if (sparkContext != null) {
-        logInfo("Invoking sc stop from checkNumExecutorsFailed")
-        sparkContext.stop()
-      } else {
-        logError("sparkContext is null when should shutdown")
-      }
-    }
-  }
-
-  private def launchReporterThread(): Thread = {
-    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-    // must be <= timeoutInterval / 2.
-    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
-
-    val t = new Thread {
-      override def run() {
-        while (userThread.isAlive && !isFinished) {
-          checkNumExecutorsFailed()
-          allocateMissingExecutor()
-          logDebug("Sending progress")
-          yarnAllocator.allocateResources()
-          Thread.sleep(interval)
-        }
-      }
-    }
-    // Setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - heartbeat interval : " + interval)
-    t
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
-    synchronized {
-      if (isFinished) {
-        return
-      }
-      isFinished = true
-
-      logInfo("Unregistering ApplicationMaster with " + status)
-      if (registered) {
-        amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
-      }
-    }
-  }
-
-  /**
-   * Clean up the staging directory.
-   */
-  private def cleanupStagingDir() {
-    var stagingDirPath: Path = null
-    try {
-      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
-      if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
-        if (stagingDirPath == null) {
-          logError("Staging directory is null")
-          return
-        }
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
-      }
-    } catch {
-      case ioe: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
-    }
-  }
-
-  // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
-  class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
-
-    def run() {
-      logInfo("AppMaster received a signal.")
-      // We need to clean up staging dir before HDFS is shut down
-      // make sure we don't delete it until this is the last AM
-      if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
-    }
-  }
-
-}
-
-object ApplicationMaster extends Logging {
-  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
-  // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
-
-  private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
-  val sparkContextRef: AtomicReference[SparkContext] =
-    new AtomicReference[SparkContext](null)
-
-  def register(master: ApplicationMaster) {
-    applicationMasters.add(master)
-  }
-
-  /**
-   * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been
-   * initialized in the user code.
-   */
-  def sparkContextInitialized(sc: SparkContext): Boolean = {
-    var modified = false
-    sparkContextRef.synchronized {
-      modified = sparkContextRef.compareAndSet(null, sc)
-      sparkContextRef.notifyAll()
-    }
-
-    // Add a shutdown hook - as a best effort in case users do not call sc.stop or do
-    // System.exit.
-    // Should not really have to do this, but it helps YARN to evict resources earlier.
-    // Not to mention, prevent the Client from declaring failure even though we exited properly.
-    // Note that this will unfortunately not properly clean up the staging files because it gets
-    // called too late, after the filesystem is already shutdown.
-    if (modified) {
-      Runtime.getRuntime().addShutdownHook(new Thread with Logging {
-        // This is not only logs, but also ensures that log system is initialized for this instance
-        // when we are actually 'run'-ing.
-        logInfo("Adding shutdown hook for context " + sc)
-
-        override def run() {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-          // Best case ...
-          for (master <- applicationMasters) {
-            master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-          }
-        }
-      })
-    }
-
-    // Wait for initialization to complete and at least 'some' nodes to get allocated.
-    modified
-  }
-
-  def getApplicationAttemptId(): ApplicationAttemptId = {
-    val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    appAttemptId
-  }
-
-  def main(argStrings: Array[String]) {
-    SignalLogger.register(log)
-    val args = new ApplicationMasterArguments(argStrings)
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      new ApplicationMaster(args).run()
-    }
-  }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
deleted file mode 100644
index e093fe4ae6ff8e17e4acce0f6d146ed58b9e4a6b..0000000000000000000000000000000000000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.Socket
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import akka.actor._
-import akka.remote._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-/**
- * An application master that allocates executors on behalf of a driver that is running outside
- * the cluster.
- *
- * This is used only in yarn-client mode.
- */
-class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
-  extends Logging {
-
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
-    this(args, new Configuration(), sparkConf)
-
-  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
-
-  private var appAttemptId: ApplicationAttemptId = _
-  private var reporterThread: Thread = _
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  private var yarnAllocator: YarnAllocationHandler = _
-  private var driverClosed: Boolean = false
-  private var isFinished: Boolean = false
-  private var registered: Boolean = false
-
-  private var amClient: AMRMClient[ContainerRequest] = _
-
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
-  val securityManager = new SecurityManager(sparkConf)
-  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = sparkConf, securityManager = securityManager)._1
-  var actor: ActorRef = _
-
-  // This actor just working as a monitor to watch on Driver Actor.
-  class MonitorActor(driverUrl: String) extends Actor {
-
-    var driver: ActorSelection = _
-
-    override def preStart() {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message to establish the connection, after which
-      // we can monitor Lifecycle Events.
-      driver ! "Hello"
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        driverClosed = true
-      case x: AddWebUIFilter =>
-        logInfo(s"Add WebUI Filter. $x")
-        driver ! x
-    }
-  }
-
-  def run() {
-    amClient = AMRMClient.createAMRMClient()
-    amClient.init(yarnConf)
-    amClient.start()
-
-    appAttemptId = ApplicationMaster.getApplicationAttemptId()
-    synchronized {
-      if (!isFinished) {
-        registerApplicationMaster()
-        registered = true
-      }
-    }
-
-    waitForSparkMaster()
-    addAmIpFilter()
-
-    // Allocate all containers
-    allocateExecutors()
-
-    // Launch a progress reporter thread, else app will get killed after expiration
-    // (def: 10mins) timeout ensure that progress is sent before
-    // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
-    val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
-    // must be <= timeoutInterval / 2.
-    val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-    reporterThread = launchReporterThread(interval)
-
-
-    // Wait for the reporter thread to Finish.
-    reporterThread.join()
-
-    finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
-    actorSystem.shutdown()
-
-    logInfo("Exited")
-    System.exit(0)
-  }
-
-  private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
-    logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
-    amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
-  }
-
-  // add the yarn amIpFilter that Yarn requires for properly securing the UI
-  private def addAmIpFilter() {
-    val proxy = WebAppUtils.getProxyHostAndPort(conf)
-    val parts = proxy.split(":")
-    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-    val uriBase = "http://" + proxy + proxyBase
-    val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-    val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
-  }
-
-  private def waitForSparkMaster() {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-    while(!driverUp) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-        Thread.sleep(100)
-      }
-    }
-    sparkConf.set("spark.driver.host", driverHost)
-    sparkConf.set("spark.driver.port", driverPort.toString)
-
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-      SparkEnv.driverActorSystemName,
-      driverHost,
-      driverPort.toString,
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-    actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
-  }
-
-
-  private def allocateExecutors() {
-    // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map()
-
-    yarnAllocator = YarnAllocationHandler.newAllocator(
-      yarnConf,
-      amClient,
-      appAttemptId,
-      args,
-      preferredNodeLocationData,
-      sparkConf)
-
-    logInfo("Requesting " + args.numExecutors + " executors.")
-    // Wait until all containers have launched
-    yarnAllocator.addResourceRequests(args.numExecutors)
-    yarnAllocator.allocateResources()
-    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
-        !isFinished) {
-      checkNumExecutorsFailed()
-      allocateMissingExecutor()
-      yarnAllocator.allocateResources()
-      Thread.sleep(100)
-    }
-
-    logInfo("All executors have launched.")
-  }
-
-  private def allocateMissingExecutor() {
-    val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
-      yarnAllocator.getNumPendingAllocate
-    if (missingExecutorCount > 0) {
-      logInfo("Allocating %d containers to make up for (potentially) lost containers".
-        format(missingExecutorCount))
-      yarnAllocator.addResourceRequests(missingExecutorCount)
-    }
-  }
-
-  private def checkNumExecutorsFailed() {
-    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-      finishApplicationMaster(FinalApplicationStatus.FAILED,
-        "max number of executor failures reached")
-    }
-  }
-
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
-
-    val t = new Thread {
-      override def run() {
-        while (!driverClosed && !isFinished) {
-          checkNumExecutorsFailed()
-          allocateMissingExecutor()
-          logDebug("Sending progress")
-          yarnAllocator.allocateResources()
-          Thread.sleep(sleepTime)
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    t
-  }
-
-  def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
-    synchronized {
-      if (isFinished) {
-        return
-      }
-      logInfo("Unregistering ApplicationMaster with " + status)
-      if (registered) {
-        val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
-        amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
-      }
-      isFinished = true
-    }
-  }
-
-}
-
-object ExecutorLauncher {
-  def main(argStrings: Array[String]) {
-    val args = new ApplicationMasterArguments(argStrings)
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      new ExecutorLauncher(args).run()
-    }
-  }
-}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 0a461749c819d1cab7ff2776037a6de59b915021..4d5144989991f3696ed504be40a10cc8c9539458 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,12 +17,9 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
 import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
@@ -32,20 +29,13 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
 import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
 import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration {
-  type AllocationType = Value
-  val HOST, RACK, ANY = Value
-}
+import org.apache.hadoop.yarn.util.Records
 
 // TODO:
 // Too many params.
@@ -61,16 +51,14 @@ object AllocationType extends Enumeration {
  * Acquires resources for executors from a ResourceManager and launches executors in new containers.
  */
 private[yarn] class YarnAllocationHandler(
-    val conf: Configuration,
-    val amClient: AMRMClient[ContainerRequest],
-    val appAttemptId: ApplicationAttemptId,
-    val maxExecutors: Int,
-    val executorMemory: Int,
-    val executorCores: Int,
-    val preferredHostToCount: Map[String, Int], 
-    val preferredRackToCount: Map[String, Int],
-    val sparkConf: SparkConf)
-  extends Logging {
+    conf: Configuration,
+    sparkConf: SparkConf,
+    amClient: AMRMClient[ContainerRequest],
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+  extends YarnAllocator with Logging {
+
   // These three are locked on allocatedHostToContainersMap. Complementary data structures
   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
   // allocatedContainerToHostMap: container to host mapping.
@@ -92,7 +80,7 @@ private[yarn] class YarnAllocationHandler(
 
   // Additional memory overhead - in mb.
   private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    YarnAllocationHandler.MEMORY_OVERHEAD)
+    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
 
   // Number of container requests that have been sent to, but not yet allocated by the
   // ApplicationMaster.
@@ -103,11 +91,15 @@ private[yarn] class YarnAllocationHandler(
   private val lastResponseId = new AtomicInteger()
   private val numExecutorsFailed = new AtomicInteger()
 
-  def getNumPendingAllocate: Int = numPendingAllocate.intValue
+  private val maxExecutors = args.numExecutors
+  private val executorMemory = args.executorMemory
+  private val executorCores = args.executorCores
+  private val (preferredHostToCount, preferredRackToCount) =
+    generateNodeToWeight(conf, preferredNodes)
 
-  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+  override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
-  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+  override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
     container.getResource.getMemory >= (executorMemory + memoryOverhead)
@@ -119,7 +111,9 @@ private[yarn] class YarnAllocationHandler(
     amClient.releaseAssignedContainer(containerId)
   }
 
-  def allocateResources() {
+  override def allocateResources() = {
+    addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
+
     // We have already set the container request. Poll the ResourceManager for a response.
     // This doubles as a heartbeat if there are no pending container requests.
     val progressIndicator = 0.1f
@@ -204,7 +198,7 @@ private[yarn] class YarnAllocationHandler(
 
         // For rack local containers
         if (remainingContainers != null) {
-          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
           if (rack != null) {
             val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
             val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
@@ -273,7 +267,7 @@ private[yarn] class YarnAllocationHandler(
           // To be safe, remove the container from `pendingReleaseContainers`.
           pendingReleaseContainers.remove(containerId)
 
-          val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
+          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
           allocatedHostToContainersMap.synchronized {
             val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
               new HashSet[ContainerId]())
@@ -360,7 +354,7 @@ private[yarn] class YarnAllocationHandler(
             allocatedContainerToHostMap.remove(containerId)
 
             // TODO: Move this part outside the synchronized block?
-            val rack = YarnAllocationHandler.lookupRack(conf, host)
+            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
             if (rack != null) {
               val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
               if (rackCount > 0) {
@@ -393,9 +387,9 @@ private[yarn] class YarnAllocationHandler(
 
     for (container <- hostContainers) {
       val candidateHost = container.getNodes.last
-      assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
 
-      val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
       if (rack != null) {
         var count = rackToCounts.getOrElse(rack, 0)
         count += 1
@@ -409,7 +403,7 @@ private[yarn] class YarnAllocationHandler(
         AllocationType.RACK,
         rack,
         count,
-        YarnAllocationHandler.PRIORITY)
+        YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
     }
 
     requestedContainers
@@ -431,7 +425,7 @@ private[yarn] class YarnAllocationHandler(
     retval
   }
 
-  def addResourceRequests(numExecutors: Int) {
+  private def addResourceRequests(numExecutors: Int) {
     val containerRequests: List[ContainerRequest] =
       if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
         logDebug("numExecutors: " + numExecutors + ", host preferences: " +
@@ -440,9 +434,9 @@ private[yarn] class YarnAllocationHandler(
           AllocationType.ANY,
           resource = null,
           numExecutors,
-          YarnAllocationHandler.PRIORITY).toList
+          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
       } else {
-        // Request for all hosts in preferred nodes and for numExecutors - 
+        // Request for all hosts in preferred nodes and for numExecutors -
         // candidates.size, request by default allocation policy.
         val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
         for ((candidateHost, candidateCount) <- preferredHostToCount) {
@@ -453,7 +447,7 @@ private[yarn] class YarnAllocationHandler(
               AllocationType.HOST,
               candidateHost,
               requiredCount,
-              YarnAllocationHandler.PRIORITY)
+              YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
           }
         }
         val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
@@ -463,7 +457,7 @@ private[yarn] class YarnAllocationHandler(
           AllocationType.ANY,
           resource = null,
           numExecutors,
-          YarnAllocationHandler.PRIORITY)
+          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
 
         val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
           hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
@@ -512,7 +506,7 @@ private[yarn] class YarnAllocationHandler(
     // There must be a third request, which is ANY. That will be specially handled.
     requestType match {
       case AllocationType.HOST => {
-        assert(YarnAllocationHandler.ANY_HOST != resource)
+        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
         val hostname = resource
         val nodeLocal = constructContainerRequests(
           Array(hostname),
@@ -521,7 +515,7 @@ private[yarn] class YarnAllocationHandler(
           priority)
 
         // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
-        YarnAllocationHandler.populateRackInfo(conf, hostname)
+        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
         nodeLocal
       }
       case AllocationType.RACK => {
@@ -554,88 +548,6 @@ private[yarn] class YarnAllocationHandler(
     }
     requests
   }
-}
-
-object YarnAllocationHandler {
-
-  val ANY_HOST = "*"
-  // All requests are issued with same priority : we do not (yet) have any distinction between 
-  // request types (like map/reduce in hadoop for example)
-  val PRIORITY = 1
-
-  // Additional memory overhead - in mb.
-  val MEMORY_OVERHEAD = 384
-
-  // Host to rack map - saved from allocation requests. We are expecting this not to change.
-  // Note that it is possible for this to change : and ResurceManager will indicate that to us via
-  // update response to allocate. But we are punting on handling that for now.
-  private val hostToRack = new ConcurrentHashMap[String, String]()
-  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
-  def newAllocator(
-      conf: Configuration,
-      amClient: AMRMClient[ContainerRequest],
-      appAttemptId: ApplicationAttemptId,
-      args: ApplicationMasterArguments,
-      sparkConf: SparkConf
-    ): YarnAllocationHandler = {
-    new YarnAllocationHandler(
-      conf,
-      amClient,
-      appAttemptId,
-      args.numExecutors, 
-      args.executorMemory,
-      args.executorCores,
-      Map[String, Int](),
-      Map[String, Int](),
-      sparkConf)
-  }
-
-  def newAllocator(
-      conf: Configuration,
-      amClient: AMRMClient[ContainerRequest],
-      appAttemptId: ApplicationAttemptId,
-      args: ApplicationMasterArguments,
-      map: collection.Map[String,
-      collection.Set[SplitInfo]],
-      sparkConf: SparkConf
-    ): YarnAllocationHandler = {
-    val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map)
-    new YarnAllocationHandler(
-      conf,
-      amClient,
-      appAttemptId,
-      args.numExecutors, 
-      args.executorMemory,
-      args.executorCores,
-      hostToSplitCount,
-      rackToSplitCount,
-      sparkConf)
-  }
-
-  def newAllocator(
-      conf: Configuration,
-      amClient: AMRMClient[ContainerRequest],
-      appAttemptId: ApplicationAttemptId,
-      maxExecutors: Int,
-      executorMemory: Int,
-      executorCores: Int,
-      map: collection.Map[String, collection.Set[SplitInfo]],
-      sparkConf: SparkConf
-    ): YarnAllocationHandler = {
-    val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-    new YarnAllocationHandler(
-      conf,
-      amClient,
-      appAttemptId,
-      maxExecutors,
-      executorMemory,
-      executorCores,
-      hostToCount,
-      rackToCount,
-      sparkConf)
-  }
 
   // A simple method to copy the split info map.
   private def generateNodeToWeight(
@@ -654,7 +566,7 @@ object YarnAllocationHandler {
       val hostCount = hostToCount.getOrElse(host, 0)
       hostToCount.put(host, hostCount + splits.size)
 
-      val rack = lookupRack(conf, host)
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
       if (rack != null){
         val rackCount = rackToCount.getOrElse(host, 0)
         rackToCount.put(host, rackCount + splits.size)
@@ -664,42 +576,4 @@ object YarnAllocationHandler {
     (hostToCount.toMap, rackToCount.toMap)
   }
 
-  def lookupRack(conf: Configuration, host: String): String = {
-    if (!hostToRack.contains(host)) {
-      populateRackInfo(conf, host)
-    }
-    hostToRack.get(host)
-  }
-
-  def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
-    Option(rackToHostSet.get(rack)).map { set =>
-      val convertedSet: collection.mutable.Set[String] = set
-      // TODO: Better way to get a Set[String] from JSet.
-      convertedSet.toSet
-    }
-  }
-
-  def populateRackInfo(conf: Configuration, hostname: String) {
-    Utils.checkHost(hostname)
-
-    if (!hostToRack.containsKey(hostname)) {
-      // If there are repeated failures to resolve, all to an ignore list.
-      val rackInfo = RackResolver.resolve(conf, hostname)
-      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
-        val rack = rackInfo.getNetworkLocation
-        hostToRack.put(hostname, rack)
-        if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack,
-            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
-        }
-        rackToHostSet.get(rack).add(hostname)
-
-        // TODO(harvey): Figure out what this comment means...
-        // Since RackResolver caches, we are disabling this for now ...
-      } /* else {
-        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
-        hostToRack.put(hostname, null)
-      } */
-    }
-  }
 }
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e8b8d9bc722bd0f10cbdf36533fb0b3a9929ce1a
--- /dev/null
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+
+/**
+ * YarnRMClient implementation for the Yarn stable API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+  private var amClient: AMRMClient[ContainerRequest] = _
+  private var uiHistoryAddress: String = _
+
+  override def register(
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      preferredNodeLocations: Map[String, Set[SplitInfo]],
+      uiAddress: String,
+      uiHistoryAddress: String) = {
+    amClient = AMRMClient.createAMRMClient()
+    amClient.init(conf)
+    amClient.start()
+    this.uiHistoryAddress = uiHistoryAddress
+
+    logInfo("Registering the ApplicationMaster")
+    amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+    new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
+      preferredNodeLocations)
+  }
+
+  override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =
+    amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+
+  override def getAttemptId() = {
+    val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    appAttemptId
+  }
+
+  override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
+
+  override def getMaxRegAttempts(conf: YarnConfiguration) =
+    conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+
+}