diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c6c5b8f22b5496ebfea548b8a13e46917b5e2e4d..218b353dd9d497e4f6b9a4e2296ccc3db8089cae 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
     new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
 
   // Initialize the Spark UI, registering all associated listeners
-  private[spark] val ui = new SparkUI(this)
-  ui.bind()
+  private[spark] val ui: Option[SparkUI] =
+    if (conf.getBoolean("spark.ui.enabled", true)) {
+      Some(new SparkUI(this))
+    } else {
+      // For tests, do not enable the UI
+      None
+    }
+  ui.foreach(_.bind())
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
@@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Shut down the SparkContext. */
   def stop() {
     postApplicationEnd()
-    ui.stop()
+    ui.foreach(_.stop())
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
     val dagSchedulerCopy = dagScheduler
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5b5257269d92ff052f9cbd5da97e13043807883a..9a0cb1c6c6ccd824ab9483611bd9f61946a89b1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
       conf.set("spark.ui.filters", filterName)
       conf.set(s"spark.$filterName.params", filterParams)
-      JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
+      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
     }
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 513d74a08a47f9b3d5d813fb372b9be5ca51a46c..ee10aa061f4e9c0776bb4357a3b3d004d03b4367 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
 
 import org.apache.spark.{Logging, SparkContext, SparkEnv}
@@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend(
 
     val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
     val fs = FileSystem.get(conf)
+    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
 
     logInfo("Writing to HDFS file: "  + driverFilePath)
     logInfo("Writing Akka address: "  + driverUrl)
-    logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
+    logInfo("Writing Spark UI Address: " + appUIAddress)
 
     // Create temporary file to prevent race condition where executors get empty driverUrl file
     val temp = fs.create(tmpPath, true)
     temp.writeUTF(driverUrl)
     temp.writeInt(maxCores)
-    temp.writeUTF(sc.ui.appUIAddress)
+    temp.writeUTF(appUIAddress)
     temp.close()
 
     // "Atomic" rename
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 06872ace2ecf44de650ec9ab1cc73a9ab0480e98..2f45d192e1d4dc23b9ac2a8f656aca189c5d0da1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -67,8 +67,10 @@ private[spark] class SparkDeploySchedulerBackend(
     val javaOpts = sparkJavaOpts ++ extraJavaOpts
     val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
+    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
+    val eventLogDir = sc.eventLogger.map(_.logDir)
     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
-      sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
+      appUIAddress, eventLogDir)
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 038746d2eda4b9fd43591d3471532071a629df37..2f5664295670188db05946fb64c11e299a656250 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -36,11 +36,25 @@ import scala.xml.Node
 
 class UISuite extends FunSuite {
 
+  /**
+   * Create a test SparkContext with the SparkUI enabled.
+   * It is safe to `get` the SparkUI directly from the SparkContext returned here.
+   */
+  private def newSparkContext(): SparkContext = {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+      .set("spark.ui.enabled", "true")
+    val sc = new SparkContext(conf)
+    assert(sc.ui.isDefined)
+    sc
+  }
+
   ignore("basic ui visibility") {
-    withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(newSparkContext()) { sc =>
       // test if the ui is visible, and all the expected tabs are visible
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sc.ui.appUIAddress).mkString
+        val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
         assert(!html.contains("random data that should not be present"))
         assert(html.toLowerCase.contains("stages"))
         assert(html.toLowerCase.contains("storage"))
@@ -51,7 +65,7 @@ class UISuite extends FunSuite {
   }
 
   ignore("visibility at localhost:4040") {
-    withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(newSparkContext()) { sc =>
       // test if visible from http://localhost:4040
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
         val html = Source.fromURL("http://localhost:4040").mkString
@@ -61,8 +75,8 @@ class UISuite extends FunSuite {
   }
 
   ignore("attaching a new tab") {
-    withSpark(new SparkContext("local", "test")) { sc =>
-      val sparkUI = sc.ui
+    withSpark(newSparkContext()) { sc =>
+      val sparkUI = sc.ui.get
 
       val newTab = new WebUITab(sparkUI, "foo") {
         attachPage(new WebUIPage("") {
@@ -73,7 +87,7 @@ class UISuite extends FunSuite {
       }
       sparkUI.attachTab(newTab)
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sc.ui.appUIAddress).mkString
+        val html = Source.fromURL(sparkUI.appUIAddress).mkString
         assert(!html.contains("random data that should not be present"))
 
         // check whether new page exists
@@ -87,7 +101,7 @@ class UISuite extends FunSuite {
       }
 
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
+        val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
         // check whether new page exists
         assert(html.contains("magic"))
       }
@@ -129,16 +143,20 @@ class UISuite extends FunSuite {
   }
 
   test("verify appUIAddress contains the scheme") {
-    withSpark(new SparkContext("local", "test")) { sc =>
-      val uiAddress = sc.ui.appUIAddress
-      assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
+    withSpark(newSparkContext()) { sc =>
+      val ui = sc.ui.get
+      val uiAddress = ui.appUIAddress
+      val uiHostPort = ui.appUIHostPort
+      assert(uiAddress.equals("http://" + uiHostPort))
     }
   }
 
   test("verify appUIAddress contains the port") {
-    withSpark(new SparkContext("local", "test")) { sc =>
-      val splitUIAddress = sc.ui.appUIAddress.split(':')
-      assert(splitUIAddress(2).toInt == sc.ui.boundPort)
+    withSpark(newSparkContext()) { sc =>
+      val ui = sc.ui.get
+      val splitUIAddress = ui.appUIAddress.split(':')
+      val boundPort = ui.boundPort
+      assert(splitUIAddress(2).toInt == boundPort)
     }
   }
 }
diff --git a/pom.xml b/pom.xml
index 64fb1e57e30e002a35af8cd41120ee0bb8331f2a..e5f863e85445ca696d73551774dc5d39cce9e8cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -899,7 +899,7 @@
               <java.awt.headless>true</java.awt.headless>
               <spark.test.home>${session.executionRootDirectory}</spark.test.home>
               <spark.testing>1</spark.testing>
-              <spark.ui.port>0</spark.ui.port>
+              <spark.ui.enabled>false</spark.ui.enabled>
             </systemProperties>
           </configuration>
           <executions>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 45f6d2973ea90c663a6957718b9a50317647a2cf..c07ea313f12282483205eb700e87dbe67beba400 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -337,7 +337,7 @@ object TestSettings {
     javaOptions in Test += "-Dspark.test.home=" + sparkHome,
     javaOptions in Test += "-Dspark.testing=1",
     javaOptions in Test += "-Dspark.ports.maxRetries=100",
-    javaOptions in Test += "-Dspark.ui.port=0",
+    javaOptions in Test += "-Dspark.ui.enabled=false",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
       .map { case (k,v) => s"-D$k=$v" }.toSeq,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 457e8ab28ed82af83b0c5a7cf2a2a1afc7447a6e..f63560dcb5b892fc222e7731a2f8eef5d778a326 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
 import org.apache.spark.util.MetadataCleaner
 
 /**
@@ -158,7 +158,14 @@ class StreamingContext private[streaming] (
 
   private[streaming] val waiter = new ContextWaiter
 
-  private[streaming] val uiTab = new StreamingTab(this)
+  private[streaming] val progressListener = new StreamingJobProgressListener(this)
+
+  private[streaming] val uiTab: Option[StreamingTab] =
+    if (conf.getBoolean("spark.ui.enabled", true)) {
+      Some(new StreamingTab(this))
+    } else {
+      None
+    }
 
   /** Register streaming source to metrics system */
   private val streamingSource = new StreamingSource(this)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index 75f0e8716dc7efa2507934d1cfd75c1372c36eeb..e35a568ddf1151470a4da2c7dd4ad95c585c5910 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
   override val metricRegistry = new MetricRegistry
   override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
 
-  private val streamingListener = ssc.uiTab.listener
+  private val streamingListener = ssc.progressListener
 
   private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
       defaultValue: T) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 34ac254f337eb6ac332a5b21727e39aae0a471b4..d9d04cd706a04ed286f110ac980725469f7210bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -17,18 +17,31 @@
 
 package org.apache.spark.streaming.ui
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.ui.SparkUITab
+import org.apache.spark.ui.{SparkUI, SparkUITab}
 
-/** Spark Web UI tab that shows statistics of a streaming job */
+import StreamingTab._
+
+/**
+ * Spark Web UI tab that shows statistics of a streaming job.
+ * This assumes the given SparkContext has enabled its SparkUI.
+ */
 private[spark] class StreamingTab(ssc: StreamingContext)
-  extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
+  extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
 
-  val parent = ssc.sc.ui
-  val listener = new StreamingJobProgressListener(ssc)
+  val parent = getSparkUI(ssc)
+  val listener = ssc.progressListener
 
   ssc.addStreamingListener(listener)
   attachPage(new StreamingPage(this))
   parent.attachTab(this)
 }
+
+private object StreamingTab {
+  def getSparkUI(ssc: StreamingContext): SparkUI = {
+    ssc.sc.ui.getOrElse {
+      throw new SparkException("Parent SparkUI to attach this tab to not found!")
+    }
+  }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
index 2a0db7564915d0fc02b64f5204c308cb078411c6..4c7e43c2943c9ec5c5fe0436cfb1b08d58e49e18 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -24,13 +24,22 @@ import org.scalatest.FunSuite
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.SparkConf
+
 class UISuite extends FunSuite {
 
   // Ignored: See SPARK-1530
   ignore("streaming tab in spark UI") {
-    val ssc = new StreamingContext("local", "test", Seconds(1))
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+      .set("spark.ui.enabled", "true")
+    val ssc = new StreamingContext(conf, Seconds(1))
+    assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
+    val ui = ssc.sc.ui.get
+
     eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+      val html = Source.fromURL(ui.appUIAddress).mkString
       assert(!html.contains("random data that should not be present"))
       // test if streaming tab exist
       assert(html.toLowerCase.contains("streaming"))
@@ -39,8 +48,7 @@ class UISuite extends FunSuite {
     }
 
     eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      val html = Source.fromURL(
-        ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+      val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
       assert(html.toLowerCase.contains("batch"))
       assert(html.toLowerCase.contains("network"))
     }
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 5756263e89e21036ac7f6fddc1e2a39e3fdd9f88..878b6db546032f528dd23f0cd9a75abddf0b5811 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -189,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     if (sc == null) {
       finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
     } else {
-      registerAM(sc.ui.appUIAddress, securityMgr)
+      registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
       try {
         userThread.join()
       } finally {
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 41c662cd7a6de47528a9b801bd6113799775528c..6aa6475fe4a1899791b661f29adf704230a89113 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -55,7 +55,7 @@ private[spark] class YarnClientSchedulerBackend(
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
-    conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
+    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIHostPort) }
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (