diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 318e16552201c9c76f1538ea59102899340b265c..6abf6d930c155614d5293430894f8081d8e0b96a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
 
+  case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
+    extends CoarseGrainedClusterMessage
+
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0f5545e2ed65f145c16281ae0fcf2d2ca2e74658..9f085eef46720dc3a3c4eaa60c547c5670c43eae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
 import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
+import org.apache.spark.ui.JettyUtils
 
 /**
  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -136,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         removeExecutor(executorId, reason)
         sender ! true
 
+      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+        addWebUIFilter(filterName, filterParams, proxyBase)
+        sender ! true
       case DisassociatedEvent(_, address, _) =>
         addressToExecutorId.get(address).foreach(removeExecutor(_,
           "remote Akka client disassociated"))
@@ -276,6 +280,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     }
     false
   }
+
+  // Add filters to the SparkUI
+  def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
+    if (proxyBase != null && proxyBase.nonEmpty) {
+      System.setProperty("spark.ui.proxyBase", proxyBase)
+    }
+
+    if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
+      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+      conf.set("spark.ui.filters", filterName)
+      conf.set(s"spark.$filterName.params", filterParams)
+      JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
+    }
+  }
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 9cb50d9b83ddaa0a160e1a7fffcd267361dd5344..e07aa2ee3a5a2ea8df5c534a75daa742018163ff 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging {
   }
 
   // Yarn has to go through a proxy so the base uri is provided and has to be on all links
-  val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
+  def uiRoot: String = {
+    if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
+      System.getenv("APPLICATION_WEB_PROXY_BASE")
+    } else if (System.getProperty("spark.ui.proxyBase") != null) {
+      System.getProperty("spark.ui.proxyBase")
+    }
+    else {
+      ""
+    }
+  }
 
   def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
 
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index bfdb6232f5113c7e8fa577a2fe941d2c8f73cc71..a86ad256dfa39ca5d8578eb10b64f3dfa0ea8a3c 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -32,6 +32,7 @@ import akka.actor.Terminated
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.spark.deploy.SparkHadoopUtil
 
@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
         driverClosed = true
+      case x: AddWebUIFilter =>
+        logInfo(s"Add WebUI Filter. $x")
+        driver ! x
     }
   }
 
@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     }
 
     waitForSparkMaster()
-
+    addAmIpFilter()
     // Allocate all containers
     allocateExecutors()
 
@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
   }
 
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
+    val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
+    logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
     val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
       .asInstanceOf[RegisterApplicationMasterRequest]
     appMasterRequest.setApplicationAttemptId(appAttemptId)
@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     appMasterRequest.setHost(Utils.localHostName())
     appMasterRequest.setRpcPort(0)
     // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl("")
+    appMasterRequest.setTrackingUrl(appUIAddress)
     resourceManager.registerApplicationMaster(appMasterRequest)
   }
 
+  // add the yarn amIpFilter that Yarn requires for properly securing the UI
+  private def addAmIpFilter() {
+    val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+    val parts = proxy.split(":")
+    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+    val uriBase = "http://" + proxy + proxyBase
+    val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+    val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
+  }
+
   private def waitForSparkMaster() {
     logInfo("Waiting for spark driver to be reachable.")
     var driverUp = false
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0f9fdcfcb65100c0ee1b6ec46a5d21f8cc63fd32..1b37c4bb13f4993790b6059e763ada5acb8160b4 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend(
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
+    conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index f71ad036ce0f2c5bbe8a664dd7065fb4efe25c8f..5ac95f379872331da1f64fadfcb2960aa820fb1b 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -31,10 +31,12 @@ import akka.actor.Terminated
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 /**
  * An application master that allocates executors on behalf of a driver that is running outside
@@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
         driverClosed = true
+      case x: AddWebUIFilter =>
+        logInfo(s"Add WebUI Filter. $x")
+        driver ! x
     }
   }
 
@@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     registerApplicationMaster()
 
     waitForSparkMaster()
+    addAmIpFilter()
 
     // Allocate all containers
     allocateExecutors()
@@ -142,9 +148,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
   }
 
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
-    logInfo("Registering the ApplicationMaster")
-    // TODO: Find out client's Spark UI address and fill in here?
-    amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
+    val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
+    logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
+    amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
+  }
+
+  // add the yarn amIpFilter that Yarn requires for properly securing the UI
+  private def addAmIpFilter() {
+    val proxy = WebAppUtils.getProxyHostAndPort(conf)
+    val parts = proxy.split(":")
+    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+    val uriBase = "http://" + proxy + proxyBase
+    val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+    val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
   }
 
   private def waitForSparkMaster() {