diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 0e3750fdde415891e61823ebedd7c28cc4ed6285..edc3889c9ae514cab946dcec0af548d0bb9fe9da 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -23,7 +23,10 @@ import com.google.common.io.Files
 
 import org.apache.spark.util.Utils
 
-private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
+private[spark] class HttpFileServer(
+    securityManager: SecurityManager,
+    requestedPort: Int = 0)
+  extends Logging {
 
   var baseDir : File = null
   var fileDir : File = null
@@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
     fileDir.mkdir()
     jarDir.mkdir()
     logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(baseDir, securityManager)
+    httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
     httpServer.start()
     serverUri = httpServer.uri
     logDebug("HTTP file server started at: " + serverUri)
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 7e9b517f901a2151e5bf9032bfde8529e6b8a0e9..912558d0cab7de59f5bf3b19395ba4a4c5cbc522 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.eclipse.jetty.util.security.{Constraint, Password}
 import org.eclipse.jetty.security.authentication.DigestAuthenticator
-import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
+import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
 
 import org.eclipse.jetty.server.Server
 import org.eclipse.jetty.server.bio.SocketConnector
@@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
  * as well as classes created by the interpreter when the user types in code. This is just a wrapper
  * around a Jetty server.
  */
-private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
-    extends Logging {
+private[spark] class HttpServer(
+    resourceBase: File,
+    securityManager: SecurityManager,
+    requestedPort: Int = 0,
+    serverName: String = "HTTP server")
+  extends Logging {
+
   private var server: Server = null
-  private var port: Int = -1
+  private var port: Int = requestedPort
 
   def start() {
     if (server != null) {
       throw new ServerStateException("Server is already started")
     } else {
       logInfo("Starting HTTP Server")
-      server = new Server()
-      val connector = new SocketConnector
-      connector.setMaxIdleTime(60*1000)
-      connector.setSoLingerTime(-1)
-      connector.setPort(0)
-      server.addConnector(connector)
-
-      val threadPool = new QueuedThreadPool
-      threadPool.setDaemon(true)
-      server.setThreadPool(threadPool)
-      val resHandler = new ResourceHandler
-      resHandler.setResourceBase(resourceBase.getAbsolutePath)
-
-      val handlerList = new HandlerList
-      handlerList.setHandlers(Array(resHandler, new DefaultHandler))
-
-      if (securityManager.isAuthenticationEnabled()) {
-        logDebug("HttpServer is using security")
-        val sh = setupSecurityHandler(securityManager)
-        // make sure we go through security handler to get resources
-        sh.setHandler(handlerList)
-        server.setHandler(sh)
-      } else {
-        logDebug("HttpServer is not using security")
-        server.setHandler(handlerList)
-      }
-
-      server.start()
-      port = server.getConnectors()(0).getLocalPort()
+      val (actualServer, actualPort) =
+        Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
+      server = actualServer
+      port = actualPort
     }
   }
 
+  /**
+   * Actually start the HTTP server on the given port.
+   *
+   * Note that this is only best effort in the sense that we may end up binding to a nearby port
+   * in the event of port collision. Return the bound server and the actual port used.
+   */
+  private def doStart(startPort: Int): (Server, Int) = {
+    val server = new Server()
+    val connector = new SocketConnector
+    connector.setMaxIdleTime(60 * 1000)
+    connector.setSoLingerTime(-1)
+    connector.setPort(startPort)
+    server.addConnector(connector)
+
+    val threadPool = new QueuedThreadPool
+    threadPool.setDaemon(true)
+    server.setThreadPool(threadPool)
+    val resHandler = new ResourceHandler
+    resHandler.setResourceBase(resourceBase.getAbsolutePath)
+
+    val handlerList = new HandlerList
+    handlerList.setHandlers(Array(resHandler, new DefaultHandler))
+
+    if (securityManager.isAuthenticationEnabled()) {
+      logDebug("HttpServer is using security")
+      val sh = setupSecurityHandler(securityManager)
+      // make sure we go through security handler to get resources
+      sh.setHandler(handlerList)
+      server.setHandler(sh)
+    } else {
+      logDebug("HttpServer is not using security")
+      server.setHandler(handlerList)
+    }
+
+    server.start()
+    val actualPort = server.getConnectors()(0).getLocalPort
+
+    (server, actualPort)
+  }
+
   /**
    * Setup Jetty to the HashLoginService using a single user with our
    * shared secret. Configure it to use DIGEST-MD5 authentication so that the password
@@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
     if (server == null) {
       throw new ServerStateException("Server is not started")
     } else {
-      return "http://" + Utils.localIpAddress + ":" + port
+      "http://" + Utils.localIpAddress + ":" + port
     }
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cce7a23d3b9fc4fad9ec6918b1accdab14d39f93..13f0bff7ee507dd007fd646e2494048eb60ee1bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -323,6 +323,14 @@ private[spark] object SparkConf {
    * the scheduler, while the rest of the spark configs can be inherited from the driver later.
    */
   def isExecutorStartupConf(name: String): Boolean = {
-    isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
+    isAkkaConf(name) ||
+    name.startsWith("spark.akka") ||
+    name.startsWith("spark.auth") ||
+    isSparkPortConf(name)
   }
+
+  /**
+   * Return whether the given config is a Spark port config.
+   */
+  def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index dd8e4ac66dc666c8f5e46333fa98747ebccc17be..9d4edeb6d96cff650bf53d0d97d816e28272a7fa 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -22,7 +22,6 @@ import java.net.Socket
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import scala.concurrent.Await
 import scala.util.Properties
 
 import akka.actor._
@@ -151,10 +150,10 @@ object SparkEnv extends Logging {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
       securityManager = securityManager)
 
-    // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
-    // figure out which port number Akka actually bound to and set spark.driver.port to it.
-    if (isDriver && port == 0) {
-      conf.set("spark.driver.port",  boundPort.toString)
+    // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
+    // This is so that we tell the executors the correct port to connect to.
+    if (isDriver) {
+      conf.set("spark.driver.port", boundPort.toString)
     }
 
     // Create an instance of the class named by the given Java system property, or by
@@ -222,7 +221,8 @@ object SparkEnv extends Logging {
 
     val httpFileServer =
       if (isDriver) {
-        val server = new HttpFileServer(securityManager)
+        val fileServerPort = conf.getInt("spark.fileserver.port", 0)
+        val server = new HttpFileServer(securityManager, fileServerPort)
         server.initialize()
         conf.set("spark.fileserver.uri",  server.serverUri)
         server
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 487456467b23b8918187536e22452ee18383bf20..942dc7d7eac8730b09d5ef3d744d0c9bd6f30d48 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {
 
   private def createServer(conf: SparkConf) {
     broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
-    server = new HttpServer(broadcastDir, securityManager)
+    val broadcastPort = conf.getInt("spark.broadcast.port", 0)
+    server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
     server.start()
     serverUri = server.uri
     logInfo("Broadcast server started at " + serverUri)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 17c507af2652defb948d6c8991d7f0bd2ebb7a8a..c07003784e8ac4cedf931cf5a905cb3e33e4d724 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -155,8 +155,6 @@ object Client {
     conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
     Logger.getRootLogger.setLevel(driverArgs.logLevel)
 
-    // TODO: See if we can initialize akka so return messages are sent back using the same TCP
-    //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
     val (actorSystem, _) = AkkaUtils.createActorSystem(
       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 16aa0493370ddc18f067b7814cff40cec9517950..d86ec1e03e45c6c4f1610575d7aab23a0c221313 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
  */
 private[spark]
 class MasterWebUI(val master: Master, requestedPort: Int)
-  extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
+  extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
 
   val masterActorRef = master.self
   val timeout = AkkaUtils.askTimeout(master.conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index a9f531e9e4caeceed799e5fe3f85c1466a43d3a9..47fbda600bea7586ebb83abacb123a0c0a2ac386 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.deploy.worker.ui.WorkerWebUI._
 import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.AkkaUtils
@@ -34,7 +35,7 @@ class WorkerWebUI(
     val worker: Worker,
     val workDir: File,
     port: Option[Int] = None)
-  extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
+  extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
   with Logging {
 
   val timeout = AkkaUtils.askTimeout(worker.conf)
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index af736de40539705995aebe89b652e5613dcee579..1f46a0f17649081f11cc1cc8a2cf11488b0d085e 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -115,8 +115,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
       // Bootstrap to fetch the driver's Spark properties.
       val executorConf = new SparkConf
+      val port = executorConf.getInt("spark.executor.port", 0)
       val (fetcher, _) = AkkaUtils.createActorSystem(
-        "driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
+        "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf))
       val driver = fetcher.actorSelection(driverUrl)
       val timeout = AkkaUtils.askTimeout(executorConf)
       val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -126,7 +127,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       // Create a new ActorSystem using driver's Spark properties to run the backend.
       val driverConf = new SparkConf().setAll(props)
       val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
-        "sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
+        "sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
       // set it
       val sparkHostPort = hostname + ":" + boundPort
       actorSystem.actorOf(
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 566e8a4aaa1d2da057d5fa5985ed45ae9d8cae37..4c00225280cce03ad634816570441b7c12be203f 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -38,8 +38,12 @@ import scala.language.postfixOps
 import org.apache.spark._
 import org.apache.spark.util.{SystemClock, Utils}
 
-private[spark] class ConnectionManager(port: Int, conf: SparkConf,
-    securityManager: SecurityManager) extends Logging {
+private[spark] class ConnectionManager(
+    port: Int,
+    conf: SparkConf,
+    securityManager: SecurityManager,
+    name: String = "Connection manager")
+  extends Logging {
 
   class MessageStatus(
       val message: Message,
@@ -105,7 +109,11 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
   serverChannel.socket.setReuseAddress(true)
   serverChannel.socket.setReceiveBufferSize(256 * 1024)
 
-  serverChannel.socket.bind(new InetSocketAddress(port))
+  private def startService(port: Int): (ServerSocketChannel, Int) = {
+    serverChannel.socket.bind(new InetSocketAddress(port))
+    (serverChannel, serverChannel.socket.getLocalPort)
+  }
+  Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
   serverChannel.register(selector, SelectionKey.OP_ACCEPT)
 
   val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c0a06017945f0ae8f7a9caedaae91af7bad7aea8..3876cf43e2a7d097a1411d9d4f600e0211e7167d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -60,10 +60,12 @@ private[spark] class BlockManager(
     mapOutputTracker: MapOutputTracker)
   extends Logging {
 
+  private val port = conf.getInt("spark.blockManager.port", 0)
   val shuffleBlockManager = new ShuffleBlockManager(this)
   val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
     conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
-  val connectionManager = new ConnectionManager(0, conf, securityManager)
+  val connectionManager =
+    new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")
 
   implicit val futureExecContext = connectionManager.futureExecContext
 
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index a2535e3c1c41f182fb779b0ebda8f81c18778241..29e9cf947856f4f8d982138e7931b3f6cda7910d 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -174,40 +174,32 @@ private[spark] object JettyUtils extends Logging {
       hostName: String,
       port: Int,
       handlers: Seq[ServletContextHandler],
-      conf: SparkConf): ServerInfo = {
+      conf: SparkConf,
+      serverName: String = ""): ServerInfo = {
 
     val collection = new ContextHandlerCollection
     collection.setHandlers(handlers.toArray)
     addFilters(handlers, conf)
 
-    @tailrec
+    // Bind to the given port, or throw a java.net.BindException if the port is occupied
     def connect(currentPort: Int): (Server, Int) = {
       val server = new Server(new InetSocketAddress(hostName, currentPort))
       val pool = new QueuedThreadPool
       pool.setDaemon(true)
       server.setThreadPool(pool)
       server.setHandler(collection)
-
-      Try {
+      try {
         server.start()
-      } match {
-        case s: Success[_] =>
-          (server, server.getConnectors.head.getLocalPort)
-        case f: Failure[_] =>
-          val nextPort = (currentPort + 1) % 65536
+        (server, server.getConnectors.head.getLocalPort)
+      } catch {
+        case e: Exception =>
           server.stop()
           pool.stop()
-          val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
-          if (f.toString.contains("Address already in use")) {
-            logWarning(s"$msg - $f")
-          } else {
-            logError(msg, f.exception)
-          }
-          connect(nextPort)
+          throw e
       }
     }
 
-    val (server, boundPort) = connect(port)
+    val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
     ServerInfo(server, boundPort, collection)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 097a1b81e1dd19a951586944d07e5b71df9a910f..6c788a37dc70b9688f232429081ad5555bde084f 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -36,7 +36,7 @@ private[spark] class SparkUI(
     val listenerBus: SparkListenerBus,
     var appName: String,
     val basePath: String = "")
-  extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
+  extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
   with Logging {
 
   def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 856273e1d4e21abd4de370b8bbe7e60966b1d499..5f52f9508800734e1bd8b1cc852a347c11b71944 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -39,7 +39,8 @@ private[spark] abstract class WebUI(
     securityManager: SecurityManager,
     port: Int,
     conf: SparkConf,
-    basePath: String = "")
+    basePath: String = "",
+    name: String = "")
   extends Logging {
 
   protected val tabs = ArrayBuffer[WebUITab]()
@@ -97,7 +98,7 @@ private[spark] abstract class WebUI(
   def bind() {
     assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
     try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
       logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
     } catch {
       case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index feafd654e9e71b668ad1b6baca9c917461fab9cd..d6afb73b742421fb6f482ab45c8c49d57e54c60c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.Await
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
-import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
 
 import com.typesafe.config.ConfigFactory
@@ -44,14 +44,28 @@ private[spark] object AkkaUtils extends Logging {
    * If indestructible is set to true, the Actor System will continue running in the event
    * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
    */
-  def createActorSystem(name: String, host: String, port: Int,
-    conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
+  def createActorSystem(
+      name: String,
+      host: String,
+      port: Int,
+      conf: SparkConf,
+      securityManager: SecurityManager): (ActorSystem, Int) = {
+    val startService: Int => (ActorSystem, Int) = { actualPort =>
+      doCreateActorSystem(name, host, actualPort, conf, securityManager)
+    }
+    Utils.startServiceOnPort(port, startService, name)
+  }
+
+  private def doCreateActorSystem(
+      name: String,
+      host: String,
+      port: Int,
+      conf: SparkConf,
+      securityManager: SecurityManager): (ActorSystem, Int) = {
 
     val akkaThreads   = conf.getInt("spark.akka.threads", 4)
     val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
-
     val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
-
     val akkaFrameSize = maxFrameSizeBytes(conf)
     val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
     val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 30073a82857d2301789b1a69495d2a52947af936..c60be4f8a11d2e571f9100f4ba58035ed257ae43 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.util
 
 import java.io._
-import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
+import java.net._
 import java.nio.ByteBuffer
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
@@ -1331,4 +1331,75 @@ private[spark] object Utils extends Logging {
       .map { case (k, v) => s"-D$k=$v" }
   }
 
+  /**
+   * Default number of retries in binding to a port.
+   */
+  val portMaxRetries: Int = {
+    if (sys.props.contains("spark.testing")) {
+      // Set a higher number of retries for tests...
+      sys.props.get("spark.ports.maxRetries").map(_.toInt).getOrElse(100)
+    } else {
+      Option(SparkEnv.get)
+        .flatMap(_.conf.getOption("spark.ports.maxRetries"))
+        .map(_.toInt)
+        .getOrElse(16)
+    }
+  }
+
+  /**
+   * Attempt to start a service on the given port, or fail after a number of attempts.
+   * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
+   *
+   * @param startPort The initial port to start the service on.
+   * @param maxRetries Maximum number of retries to attempt.
+   *                   A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
+   * @param startService Function to start service on a given port.
+   *                     This is expected to throw java.net.BindException on port collision.
+   */
+  def startServiceOnPort[T](
+      startPort: Int,
+      startService: Int => (T, Int),
+      serviceName: String = "",
+      maxRetries: Int = portMaxRetries): (T, Int) = {
+    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+    for (offset <- 0 to maxRetries) {
+      // Do not increment port if startPort is 0, which is treated as a special port
+      val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536
+      try {
+        val (service, port) = startService(tryPort)
+        logInfo(s"Successfully started service$serviceString on port $port.")
+        return (service, port)
+      } catch {
+        case e: Exception if isBindCollision(e) =>
+          if (offset >= maxRetries) {
+            val exceptionMessage =
+              s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
+            val exception = new BindException(exceptionMessage)
+            // restore original stack trace
+            exception.setStackTrace(e.getStackTrace)
+            throw exception
+          }
+          logWarning(s"Service$serviceString could not bind on port $tryPort. " +
+            s"Attempting port ${tryPort + 1}.")
+      }
+    }
+    // Should never happen
+    throw new SparkException(s"Failed to start service$serviceString on port $startPort")
+  }
+
+  /**
+   * Return whether the exception is caused by an address-port collision when binding.
+   */
+  def isBindCollision(exception: Throwable): Boolean = {
+    exception match {
+      case e: BindException =>
+        if (e.getMessage != null && e.getMessage.contains("Address already in use")) {
+          return true
+        }
+        isBindCollision(e.getCause)
+      case e: Exception => isBindCollision(e.getCause)
+      case _ => false
+    }
+  }
+
 }
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1ee936bc78f4911a4a62efffeced24101e97ba35..70d423ba8a04d7930e382d04f731d0630b2175dd 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import scala.util.Random
 
 import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
-import java.net.URI
+import java.net.{BindException, ServerSocket, URI}
 import java.nio.{ByteBuffer, ByteOrder}
 
 import com.google.common.base.Charsets
@@ -265,4 +265,36 @@ class UtilsSuite extends FunSuite {
       Array("hdfs:/a.jar", "s3:/another.jar"))
   }
 
+  test("isBindCollision") {
+    // Negatives
+    assert(!Utils.isBindCollision(null))
+    assert(!Utils.isBindCollision(new Exception))
+    assert(!Utils.isBindCollision(new Exception(new Exception)))
+    assert(!Utils.isBindCollision(new Exception(new BindException)))
+    assert(!Utils.isBindCollision(new Exception(new BindException("Random message"))))
+
+    // Positives
+    val be = new BindException("Address already in use")
+    val be1 = new Exception(new BindException("Address already in use"))
+    val be2 = new Exception(new Exception(new BindException("Address already in use")))
+    assert(Utils.isBindCollision(be))
+    assert(Utils.isBindCollision(be1))
+    assert(Utils.isBindCollision(be2))
+
+    // Actual bind exception
+    var server1: ServerSocket = null
+    var server2: ServerSocket = null
+    try {
+      server1 = new java.net.ServerSocket(0)
+      server2 = new java.net.ServerSocket(server1.getLocalPort)
+    } catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[java.net.BindException])
+        assert(Utils.isBindCollision(e))
+    } finally {
+      Option(server1).foreach(_.close())
+      Option(server2).foreach(_.close())
+    }
+  }
+
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 7cd7f4124db7e048b1d154ff2ccd830f7a31bb38..5e3eb0f0871af4447311a6158b6cd774fefa751c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -566,6 +566,7 @@ Apart from these, the following properties are also available, and may be useful
   <td>(local hostname)</td>
   <td>
     Hostname or IP address for the driver to listen on.
+    This is used for communicating with the executors and the standalone Master.
   </td>
 </tr>
 <tr>
@@ -573,6 +574,51 @@ Apart from these, the following properties are also available, and may be useful
   <td>(random)</td>
   <td>
     Port for the driver to listen on.
+    This is used for communicating with the executors and the standalone Master.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.fileserver.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the driver's HTTP file server to listen on.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.broadcast.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the driver's HTTP broadcast server to listen on.
+    This is not relevant for torrent broadcast.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.replClassServer.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the driver's HTTP class server to listen on.
+    This is only relevant for the Spark shell.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blockManager.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for all block managers to listen on. These exist on both the driver and the executors.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the executor to listen on. This is used for communicating with the driver.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.port.maxRetries</code></td>
+  <td>16</td>
+  <td>
+    Maximum number of retries when binding to a port before giving up.
   </td>
 </tr>
 <tr>
diff --git a/docs/security.md b/docs/security.md
index 8312f8d017e1f9c13c1fb02ad0880c6413f10818..ec0523184d665d8636a0652097fd53d2c38f4224 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -7,6 +7,9 @@ Spark currently supports authentication via a shared secret. Authentication can
 
 * For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. 
 * For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications.
+* **IMPORTANT NOTE:** *The experimental Netty shuffle path (`spark.shuffle.use.netty`) is not secured, so do not use Netty for shuffles if running with authentication.*
+
+## Web UI
 
 The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI.  On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters.
 
@@ -14,10 +17,132 @@ Spark also supports modify ACLs to control who has access to modify a running Sp
 
 Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications.
 
+## Event Logging
+
 If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.
 
-**IMPORTANT NOTE:** *The experimental Netty shuffle path (`spark.shuffle.use.netty`) is not secured, so do not use Netty for shuffles if running with authentication.*
+## Configuring Ports for Network Security
+
+Spark makes heavy use of the network, and some environments have strict requirements for using tight
+firewall settings.  Below are the primary ports that Spark uses for its communication and how to
+configure those ports.
+
+### Standalone mode only
+
+<table class="table">
+  <tr>
+    <th>From</th><th>To</th><th>Default Port</th><th>Purpose</th><th>Configuration
+    Setting</th><th>Notes</th>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>Standalone Master</td>
+    <td>8080</td>
+    <td>Web UI</td>
+    <td><code>spark.master.ui.port /<br> SPARK_MASTER_WEBUI_PORT</code></td>
+    <td>Jetty-based. Standalone mode only.</td>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>Standalone Worker</td>
+    <td>8081</td>
+    <td>Web UI</td>
+    <td><code>spark.worker.ui.port /<br> SPARK_WORKER_WEBUI_PORT</code></td>
+    <td>Jetty-based. Standalone mode only.</td>
+  </tr>
+  <tr>
+    <td>Driver /<br> Standalone Worker</td>
+    <td>Standalone Master</td>
+    <td>7077</td>
+    <td>Submit job to cluster /<br> Join cluster</td>
+    <td><code>SPARK_MASTER_PORT</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+  </tr>
+  <tr>
+    <td>Standalone Master</td>
+    <td>Standalone Worker</td>
+    <td>(random)</td>
+    <td>Schedule executors</td>
+    <td><code>SPARK_WORKER_PORT</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+  </tr>
+</table>
+
+### All cluster managers
+
+<table class="table">
+  <tr>
+    <th>From</th><th>To</th><th>Default Port</th><th>Purpose</th><th>Configuration
+    Setting</th><th>Notes</th>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>Application</td>
+    <td>4040</td>
+    <td>Web UI</td>
+    <td><code>spark.ui.port</code></td>
+    <td>Jetty-based</td>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>History Server</td>
+    <td>18080</td>
+    <td>Web UI</td>
+    <td><code>spark.history.ui.port</code></td>
+    <td>Jetty-based</td>
+  </tr>
+  <tr>
+    <td>Executor /<br> Standalone Master</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>Connect to application /<br> Notify executor state changes</td>
+    <td><code>spark.driver.port</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly.</td>
+  </tr>
+  <tr>
+    <td>Driver</td>
+    <td>Executor</td>
+    <td>(random)</td>
+    <td>Schedule tasks</td>
+    <td><code>spark.executor.port</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly.</td>
+  </tr>
+  <tr>
+    <td>Executor</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>File server for files and jars</td>
+    <td><code>spark.fileserver.port</code></td>
+    <td>Jetty-based</td>
+  </tr>
+  <tr>
+    <td>Executor</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>HTTP Broadcast</td>
+    <td><code>spark.broadcast.port</code></td>
+    <td>Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager
+    instead.</td>
+  </tr>
+  <tr>
+    <td>Executor</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>Class file server</td>
+    <td><code>spark.replClassServer.port</code></td>
+    <td>Jetty-based. Only used in Spark shells.</td>
+  </tr>
+  <tr>
+    <td>Executor / Driver</td>
+    <td>Executor / Driver</td>
+    <td>(random)</td>
+    <td>Block Manager port</td>
+    <td><code>spark.blockManager.port</code></td>
+    <td>Raw socket via ServerSocketChannel</td>
+  </tr>
+</table>
 
-See the [configuration page](configuration.html) for more details on the security configuration parameters.
 
-See <a href="{{site.SPARK_GITHUB_URL}}/tree/master/core/src/main/scala/org/apache/spark/SecurityManager.scala"><code>org.apache.spark.SecurityManager</code></a> for implementation details about security.
+See the [configuration page](configuration.html) for more details on the security configuration
+parameters, and <a href="{{site.SPARK_GITHUB_URL}}/tree/master/core/src/main/scala/org/apache/spark/SecurityManager.scala">
+<code>org.apache.spark.SecurityManager</code></a> for implementation details about security.
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 293a7ac9bc9aafb1b568284f198a98dd829c3d68..c791c81f8bfd0fbb7d384ea1477e0bfd32a13829 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -299,97 +299,15 @@ You can run Spark alongside your existing Hadoop cluster by just launching it as
 
 # Configuring Ports for Network Security
 
-Spark makes heavy use of the network, and some environments have strict requirements for using tight
-firewall settings.  Below are the primary ports that Spark uses for its communication and how to
-configure those ports.
-
-<table class="table">
-  <tr>
-    <th>From</th><th>To</th><th>Default Port</th><th>Purpose</th><th>Configuration
-    Setting</th><th>Notes</th>
-  </tr>
-  <!-- Web UIs -->
-  <tr>
-    <td>Browser</td>
-    <td>Standalone Cluster Master</td>
-    <td>8080</td>
-    <td>Web UI</td>
-    <td><code>spark.master.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <tr>
-    <td>Browser</td>
-    <td>Driver</td>
-    <td>4040</td>
-    <td>Web UI</td>
-    <td><code>spark.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <tr>
-    <td>Browser</td>
-    <td>History Server</td>
-    <td>18080</td>
-    <td>Web UI</td>
-    <td><code>spark.history.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <tr>
-    <td>Browser</td>
-    <td>Worker</td>
-    <td>8081</td>
-    <td>Web UI</td>
-    <td><code>spark.worker.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <!-- Cluster interactions -->
-  <tr>
-    <td>Application</td>
-    <td>Standalone Cluster Master</td>
-    <td>7077</td>
-    <td>Submit job to cluster</td>
-    <td><code>spark.driver.port</code></td>
-    <td>Akka-based.  Set to "0" to choose a port randomly</td>
-  </tr>
-  <tr>
-    <td>Worker</td>
-    <td>Standalone Cluster Master</td>
-    <td>7077</td>
-    <td>Join cluster</td>
-    <td><code>spark.driver.port</code></td>
-    <td>Akka-based.  Set to "0" to choose a port randomly</td>
-  </tr>
-  <tr>
-    <td>Application</td>
-    <td>Worker</td>
-    <td>(random)</td>
-    <td>Join cluster</td>
-    <td><code>SPARK_WORKER_PORT</code> (standalone cluster)</td>
-    <td>Akka-based</td>
-  </tr>
-
-  <!-- Other misc stuff -->
-  <tr>
-    <td>Driver and other Workers</td>
-    <td>Worker</td>
-    <td>(random)</td>
-    <td>
-      <ul>
-        <li>File server for file and jars</li>
-        <li>Http Broadcast</li>
-        <li>Class file server (Spark Shell only)</li>
-      </ul>
-    </td>
-    <td>None</td>
-    <td>Jetty-based.  Each of these services starts on a random port that cannot be configured</td>
-  </tr>
-
-</table>
+Spark makes heavy use of the network, and some environments have strict requirements for using
+tight firewall settings. For a complete list of ports to configure, see the
+[security page](security.html#configuring-ports-for-network-security).
 
 # High Availability
 
 By default, standalone scheduling clusters are resilient to Worker failures (insofar as Spark itself is resilient to losing work by moving it to other workers). However, the scheduler uses a Master to make scheduling decisions, and this (by default) creates a single point of failure: if the Master crashes, no new applications can be created. In order to circumvent this, we have two high availability schemes, detailed below.
 
-## Standby Masters with ZooKeeper
+# Standby Masters with ZooKeeper
 
 **Overview**
 
@@ -429,7 +347,7 @@ There's an important distinction to be made between "registering with a Master"
 
 Due to this property, new Masters can be created at any time, and the only thing you need to worry about is that _new_ applications and Workers can find it to register with in case it becomes the leader. Once registered, you're taken care of.
 
-## Single-Node Recovery with Local File System
+# Single-Node Recovery with Local File System
 
 **Overview**
 
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aac621fe53938d447e95ce7a06d7656045cb57f2..40b588512ff08377259dedbafcd4367a5eecda81 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -330,6 +330,8 @@ object TestSettings {
     fork := true,
     javaOptions in Test += "-Dspark.test.home=" + sparkHome,
     javaOptions in Test += "-Dspark.testing=1",
+    javaOptions in Test += "-Dspark.ports.maxRetries=100",
+    javaOptions in Test += "-Dspark.ui.port=0",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
       .map { case (k,v) => s"-D$k=$v" }.toSeq,
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index f60bbb4662af1dec656f0298f922b300b00e22f7..84b57cd2dc1af6b85b97e903acb13eadea60734d 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -102,7 +102,8 @@ import org.apache.spark.util.Utils
 
     val virtualDirectory                              = new PlainFile(outputDir) // "directory" for classfiles
     /** Jetty server that will serve our classes to worker nodes */
-    val classServer                                   = new HttpServer(outputDir, new SecurityManager(conf))
+    val classServerPort                               = conf.getInt("spark.replClassServer.port", 0)
+    val classServer                                   = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
     private var currentSettings: Settings             = initialSettings
     var printResults                                  = true      // whether to print result lines
     var totalSilence                                  = false     // whether to print anything