diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f2641851cbe7c77d262613e788f52dc6eff5e04c..89318712a5777eee2046f7ec1246b471148c666e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -196,7 +196,7 @@ class SparkContext(
 
       case "yarn-standalone" =>
         val scheduler = try {
-          val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler")
+          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
           cons.newInstance(this).asInstanceOf[ClusterScheduler]
         } catch {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 478e5a0aaf2ac529c7dd733174cca69c511b5b2f..29968c273c31d59e5b45e1b8237f860c9eb21bdf 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -62,7 +62,7 @@ class SparkEnv (
     val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
     if(yarnMode) {
       try {
-        Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
       } catch {
         case th: Throwable => throw new SparkException("Unable to load YARN support", th)
       }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d003bf1bba92d4339badad080e65711e21800077..9a2cf20de79a9c5de0ad88ae9482ae0f34b3934b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
       "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(null)
     val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
-        sc.ui.appUIAddress)
+        "http://" + sc.ui.appUIAddress)
 
     client = new Client(sc.env.actorSystem, master, appDesc, this)
     client.start()
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ad456ea5653c27cd707d5b3f0687d36911133364..48eb096063510007bc04284d6deb8c3808a65660 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -1,4 +1,4 @@
-/*
+/* 
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -79,7 +79,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
     server.foreach(_.stop())
   }
 
-  private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
+  private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+
 }
 
 private[spark] object SparkUI {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index ce1acf564cc9d2ed4ccfacd95c617295a472e1a9..5573b3847bcafd3041f18eaf471b6ad374ef26bf 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -25,38 +25,45 @@ import org.apache.spark.SparkContext
 private[spark] object UIUtils {
   import Page._
 
+  // Yarn has to go through a proxy so the base uri is provided and has to be on all links
+  private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
+    getOrElse("")
+
+  def prependBaseUri(resource: String = "") = uiRoot + resource
+
   /** Returns a spark page with correctly formatted headers */
   def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
   : Seq[Node] = {
     val jobs = page match {
-      case Stages => <li class="active"><a href="/stages">Stages</a></li>
-      case _ => <li><a href="/stages">Stages</a></li>
+      case Stages => <li class="active"><a href={prependBaseUri("/stages")}>Stages</a></li>
+      case _ => <li><a href={prependBaseUri("/stages")}>Stages</a></li>
     }
     val storage = page match {
-      case Storage => <li class="active"><a href="/storage">Storage</a></li>
-      case _ => <li><a href="/storage">Storage</a></li>
+      case Storage => <li class="active"><a href={prependBaseUri("/storage")}>Storage</a></li>
+      case _ => <li><a href={prependBaseUri("/storage")}>Storage</a></li>
     }
     val environment = page match {
-      case Environment => <li class="active"><a href="/environment">Environment</a></li>
-      case _ => <li><a href="/environment">Environment</a></li>
+      case Environment => 
+        <li class="active"><a href={prependBaseUri("/environment")}>Environment</a></li>
+      case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
     }
     val executors = page match {
-      case Executors => <li class="active"><a href="/executors">Executors</a></li>
-      case _ => <li><a href="/executors">Executors</a></li>
+      case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
+      case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
     }
 
     <html>
       <head>
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
-        <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
-        <link rel="stylesheet" href="/static/webui.css" type="text/css" />
-        <script src="/static/sorttable.js"></script>
+        <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+        <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}  type="text/css" />
+        <script src={prependBaseUri("/static/sorttable.js")} ></script>
         <title>{sc.appName} - {title}</title>
       </head>
       <body>
         <div class="navbar navbar-static-top">
           <div class="navbar-inner">
-            <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
+            <a href={prependBaseUri("/")} class="brand"><img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}  /></a>
             <ul class="nav">
               {jobs}
               {storage}
@@ -86,9 +93,9 @@ private[spark] object UIUtils {
     <html>
       <head>
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
-        <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
-        <link rel="stylesheet" href="/static/webui.css" type="text/css" />
-        <script src="/static/sorttable.js"></script>
+        <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+        <link rel="stylesheet" href={prependBaseUri("/static/webui.css")}  type="text/css" />
+        <script src={prependBaseUri("/static/sorttable.js")} ></script>
         <title>{title}</title>
       </head>
       <body>
@@ -96,7 +103,7 @@ private[spark] object UIUtils {
           <div class="row-fluid">
             <div class="span12">
               <h3 style="vertical-align: middle; display: inline-block;">
-                <img src="/static/spark-logo-77x50px-hd.png" style="margin-right: 15px;" />
+                <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} style="margin-right: 15px;" />
                 {title}
               </h3>
             </div>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index 5670c933bd21268d2fc5e7f305d2d83e0fc64abb..b3d3666944b4c7de1ff4ae84dc008f8c354a4cb2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -23,6 +23,7 @@ import scala.xml.Node
 
 import org.apache.spark.scheduler.Stage
 import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
 private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
@@ -60,7 +61,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
       case None => 0
     }
     <tr>
-      <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+      <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a></td>
       <td>{p.minShare}</td>
       <td>{p.weight}</td>
       <td>{activeStages}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a8911e46aef63c3f8bca0eb63f0bb60ac554873c..399a89459b045596c396eaa4b63a8170fd04e8f0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashSet
 
 import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
 import org.apache.spark.scheduler.Stage
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
 
@@ -98,7 +99,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
 
     val poolName = listener.stageToPool.get(s)
 
-    val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+    val nameLink = <a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(),s.id)}>{s.name}</a>
     val description = listener.stageToDescription.get(s)
       .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
     val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
@@ -107,7 +108,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
     <tr>
       <td>{s.id}</td>
       {if (isFairScheduler) {
-        <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+        <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),poolName.get)}>{poolName.get}</a></td>}
       }
       <td>{description}</td>
       <td valign="middle">{submissionTime}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index c3ec9073704c26fd02b4326611edde630f54ecdf..109a7d4094c0a4bee6390d16d81871b477b1b399 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -50,7 +50,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
   def rddRow(rdd: RDDInfo): Seq[Node] = {
     <tr>
       <td>
-        <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+        <a href={"%s/storage/rdd?id=%s".format(prependBaseUri(),rdd.id)}>
           {rdd.name}
         </a>
       </td>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bb47fc0a2ca3b90c3a26aa94f67e017c3ddfb4c0..468800b2bd474341893da92d6a21d870a132153b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -613,7 +613,7 @@ private[spark] object Utils extends Logging {
    * A regular expression to match classes of the "core" Spark API that we want to skip when
    * finding the call site of a method.
    */
-  private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
+  private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
 
   private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
                                     val firstUserLine: Int, val firstUserClass: String)
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index fe5334ffdcaa7f75d572a5eebedddd0e3dcc49c7..93421efcbc30fce2fd3415d0f34c61379e42af47 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -29,8 +29,12 @@ If you want to test out the YARN deployment mode, you can use the current Spark
 
 Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those.  These are configs that are specific to SPARK on YARN.
 
+Environment variables:
 * `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables, e.g. `SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"`.
 
+System Properties:
+* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
+
 # Launching Spark on YARN
 
 Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
@@ -38,7 +42,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
 
 The command to launch the YARN Client is as follows:
 
-    SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class spark.deploy.yarn.Client \
+    SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
       --jar <YOUR_APP_JAR_FILE> \
       --class <APP_MAIN_CLASS> \
       --args <APP_MAIN_ARGUMENTS> \
@@ -50,9 +54,9 @@ The command to launch the YARN Client is as follows:
 
 For example:
 
-    SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class spark.deploy.yarn.Client \
+    SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class org.apache.spark.deploy.yarn.Client \
       --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
-      --class spark.examples.SparkPi \
+      --class org.apache.spark.examples.SparkPi \
       --args yarn-standalone \
       --num-workers 3 \
       --master-memory 4g \
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 139a977a0311f13384b948bac273c48b5a267254..858b58d338d71491b21d6098f84be8d34d0491f5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import scala.collection.JavaConversions._
-import org.apache.spark.{SparkContext, Logging, Utils}
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.util.Utils
 import org.apache.hadoop.security.UserGroupInformation
 import java.security.PrivilegedExceptionAction
 
@@ -45,6 +46,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
   private var yarnAllocator: YarnAllocationHandler = null
   private var isFinished:Boolean = false
+  private var uiAddress: String = ""
+
 
   def run() {
     // setup the directories so things go to yarn approved directories rather
@@ -53,27 +56,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
 
-    // Compute number of threads for akka
-    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+    // Workaround until hadoop moves to something which has
+    // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+    // ignore result
+    // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+    // Hence args.workerCores = numCore disabled above. Any better option ?
 
-    if (minimumMemory > 0) {
-      val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+    // Compute number of threads for akka
+    //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+    //if (minimumMemory > 0) {
+    //  val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    //  val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
 
-      if (numCore > 0) {
+    //  if (numCore > 0) {
         // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
         // TODO: Uncomment when hadoop is on a version which has this fixed.
         // args.workerCores = numCore
-      }
-    }
-
-    // Workaround until hadoop moves to something which has
-    // https://issues.apache.org/jira/browse/HADOOP-8406
-    // ignore result
-    // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
-    // Hence args.workerCores = numCore disabled above. Any better option ?
+    //  }
+    //}
     // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
     
     ApplicationMaster.register(this)
@@ -83,6 +84,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
     waitForSparkMaster()
+
+    waitForSparkContextInitialized()
+
+    // do this after spark master is up and SparkContext is created so that we can register UI Url
+    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
     
     // Allocate all containers
     allocateWorkers()
@@ -134,8 +140,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // Users can then monitor stderr/stdout on that node if required.
     appMasterRequest.setHost(Utils.localHostName())
     appMasterRequest.setRpcPort(0)
-    // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl("")
+    appMasterRequest.setTrackingUrl(uiAddress)
     return resourceManager.registerApplicationMaster(appMasterRequest)
   }
   
@@ -143,7 +148,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     logInfo("Waiting for spark driver to be reachable.")
     var driverUp = false
     var tries = 0
-    while(!driverUp && tries < 10) {
+    val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+    while(!driverUp && tries < numTries) {
       val driverHost = System.getProperty("spark.driver.host")
       val driverPort = System.getProperty("spark.driver.port")
       try {
@@ -189,24 +195,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     return t
   }
 
-  private def allocateWorkers() {
+  // this need to happen before allocateWorkers
+  private def waitForSparkContextInitialized() {
     logInfo("Waiting for spark context initialization")
-
     try {
       var sparkContext: SparkContext = null
       ApplicationMaster.sparkContextRef.synchronized {
         var count = 0
-        while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+        val waitTime = 10000L
+        val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
-          ApplicationMaster.sparkContextRef.wait(10000L)
+          ApplicationMaster.sparkContextRef.wait(waitTime)
         }
         sparkContext = ApplicationMaster.sparkContextRef.get()
-        assert(sparkContext != null)
-        this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+        assert(sparkContext != null || count >= numTries)
+
+        if (null != sparkContext) {
+          uiAddress = sparkContext.ui.appUIAddress
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, 
+                                               sparkContext.preferredNodeLocationData) 
+        } else {
+          logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + 
+                  ", numTries = " + numTries)
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+        }
       }
+    } finally {
+      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+    }
+  }
 
 
+
+  private def allocateWorkers() {
+    try {
       logInfo("Allocating " + args.numWorkers + " workers.")
       // Wait until all containers have finished
       // TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +324,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       .asInstanceOf[FinishApplicationMasterRequest]
     finishReq.setAppAttemptId(appAttemptId)
     finishReq.setFinishApplicationStatus(status)
+    // set tracking url to empty since we don't have a history server
+    finishReq.setTrackingUrl("")
     resourceManager.finishApplicationMaster(finishReq)
 
   }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f47e23b63f4eaaae9f2b54213a1e68ef3af8d6a2..f76a5ddd39e90d4998d7712113dd0949823d5e18 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -80,7 +80,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
       System.err.println("Unknown/unsupported param " + unknownParam)
     }
     System.err.println(
-      "Usage: spark.deploy.yarn.ApplicationMaster [options] \n" +
+      "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
       "Options:\n" +
       "  --jar JAR_PATH       Path to your application's JAR file (required)\n" +
       "  --class CLASS_NAME   Name of your application's main class (required)\n" +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 48e737ed79774a38adc4dd408e38947c5283c74d..844c707834ecf0ba770b2b09bee6b1af9de6cefd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import scala.collection.mutable.HashMap
 import scala.collection.JavaConversions._
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging 
+import org.apache.spark.util.Utils
 import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -254,7 +255,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     val commands = List[String](javaCommand + 
       " -server " +
       JAVA_OPTS +
-      " spark.deploy.yarn.ApplicationMaster" +
+      " org.apache.spark.deploy.yarn.ApplicationMaster" +
       " --class " + args.userClass + 
       " --jar " + args.userJar +
       userArgsToString(args) +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 6cbfadc23be93d609fb349c8ba135038c9f7ac70..cd651904d27aca52db6f94e7a4f3c2cc697220a2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -98,7 +98,7 @@ class ClientArguments(val args: Array[String]) {
       System.err.println("Unknown/unsupported param " + unknownParam)
     }
     System.err.println(
-      "Usage: spark.deploy.yarn.Client [options] \n" +
+      "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
       "Options:\n" +
       "  --jar JAR_PATH       Path to your application's JAR file (required)\n" +
       "  --class CLASS_NAME   Name of your application's main class (required)\n" +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 72dcf7178eb304eedea6fe9738fa901d0c13eb78..6229167cb44263bc16c1e951dddadbc550ba17f3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,7 +37,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
     slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 26ff214e122420314938f82dd634d719a7d99817..6d6ef149cc2d8b6351013bce769c0bf421cd9eab 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 import org.apache.spark.scheduler.SplitInfo
 import scala.collection
 import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
@@ -479,6 +480,15 @@ object YarnAllocationHandler {
   private val hostToRack = new ConcurrentHashMap[String, String]()
   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
 
+
+  def newAllocator(conf: Configuration,
+                   resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+                   args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+    new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, 
+      args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+  }
+
   def newAllocator(conf: Configuration,
                    resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
                    args: ApplicationMasterArguments,
@@ -486,7 +496,6 @@ object YarnAllocationHandler {
 
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
 
-
     new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, 
       args.workerMemory, args.workerCores, hostToCount, rackToCount)
   }
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 3828ddfc4f14afe0d2bd0665b3476c79ad0cb648..29b3f22e13697b38bc501e2f914d8fc0a202d722 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.spark._
 import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.util.Utils
 import org.apache.hadoop.conf.Configuration
 
 /**
@@ -27,6 +28,8 @@ import org.apache.hadoop.conf.Configuration
  */
 private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
 
+  logInfo("Created YarnClusterScheduler")
+
   def this(sc: SparkContext) = this(sc, new Configuration())
 
   // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate