diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed159dec4f998c91119cb26b447462e6bcbef419..f3a26f54a81fbd25a1bd3b08e397a1976f999366 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
@@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
   private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
 
   override def onStart() {
-    import scala.concurrent.ExecutionContext.Implicits.global
     logInfo("Connecting to driver: " + driverUrl)
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+      // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
       ref.ask[RegisteredExecutor.type](
         RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
-    } onComplete {
+    }(ThreadUtils.sameThread).onComplete {
+      // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) => Utils.tryLogNonFatalError {
         Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
       }
       case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
-    }
+    }(ThreadUtils.sameThread)
   }
 
   def extractLogUrls: Map[String, String] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ec185340c3a2d01228b1bf90ee50dceddbe05ceb..bbf1b83af0795a7436c3f290f4e948416b066df9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -19,8 +19,10 @@ package org.apache.spark.rdd
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.util.ThreadUtils
+
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.ExecutionContext
 import scala.reflect.ClassTag
 
 import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
     val f = new ComplexFutureAction[Seq[T]]
 
     f.run {
+      // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
+      // is a cached thread pool.
       val results = new ArrayBuffer[T](num)
       val totalParts = self.partitions.length
       var partsScanned = 0
@@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
         partsScanned += numPartsToTry
       }
       results.toSeq
-    }
+    }(AsyncRDDActions.futureExecutionContext)
 
     f
   }
@@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
       (index, data) => Unit, Unit)
   }
 }
+
+private object AsyncRDDActions {
+  val futureExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index cc794e5c90ffab8f7995efb7cae51ea596b8f98e..16d67cbfca80b89f9d0ee444916ff2a9f2f1b3ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -21,8 +21,7 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{ExecutionContext, Await, Future}
 import scala.concurrent.duration._
 import scala.util.Random
 
@@ -77,6 +76,9 @@ private[spark] class BlockManager(
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
+  private val futureExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
+
   // Actual storage of where blocks are kept
   private var externalBlockStoreInitialized = false
   private[spark] val memoryStore = new MemoryStore(this, maxMemory)
@@ -266,11 +268,13 @@ private[spark] class BlockManager(
     asyncReregisterLock.synchronized {
       if (asyncReregisterTask == null) {
         asyncReregisterTask = Future[Unit] {
+          // This is a blocking action and should run in futureExecutionContext which is a cached
+          // thread pool
           reregister()
           asyncReregisterLock.synchronized {
             asyncReregisterTask = null
           }
-        }
+        }(futureExecutionContext)
       }
     }
   }
@@ -744,7 +748,11 @@ private[spark] class BlockManager(
       case b: ByteBufferValues if putLevel.replication > 1 =>
         // Duplicate doesn't copy the bytes, but just creates a wrapper
         val bufferView = b.buffer.duplicate()
-        Future { replicate(blockId, bufferView, putLevel) }
+        Future {
+          // This is a blocking action and should run in futureExecutionContext which is a cached
+          // thread pool
+          replicate(blockId, bufferView, putLevel)
+        }(futureExecutionContext)
       case _ => null
     }
 
@@ -1218,6 +1226,7 @@ private[spark] class BlockManager(
     }
     metadataCleaner.cancel()
     broadcastCleaner.cancel()
+    futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index a85e1c763297318d7322ed04aa558685e3ebcaf5..abcad9438bf28dc790c6e7e962a1df7e3cd2ec09 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.storage
 
+import scala.collection.Iterable
+import scala.collection.generic.CanBuildFrom
 import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
 
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.RpcUtils
+import org.apache.spark.util.{ThreadUtils, RpcUtils}
 
 private[spark]
 class BlockManagerMaster(
@@ -102,8 +103,8 @@ class BlockManagerMaster(
     val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
       case e: Exception =>
-        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
-    }
+        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
+    }(ThreadUtils.sameThread)
     if (blocking) {
       Await.result(future, timeout)
     }
@@ -114,8 +115,8 @@ class BlockManagerMaster(
     val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
       case e: Exception =>
-        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
-    }
+        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
+    }(ThreadUtils.sameThread)
     if (blocking) {
       Await.result(future, timeout)
     }
@@ -128,8 +129,8 @@ class BlockManagerMaster(
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove broadcast $broadcastId" +
-          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
-    }
+          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
+    }(ThreadUtils.sameThread)
     if (blocking) {
       Await.result(future, timeout)
     }
@@ -169,11 +170,17 @@ class BlockManagerMaster(
     val response = driverEndpoint.
       askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
     val (blockManagerIds, futures) = response.unzip
-    val result = Await.result(Future.sequence(futures), timeout)
-    if (result == null) {
+    implicit val sameThread = ThreadUtils.sameThread
+    val cbf =
+      implicitly[
+        CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
+        Option[BlockStatus],
+        Iterable[Option[BlockStatus]]]]
+    val blockStatus = Await.result(
+      Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
+    if (blockStatus == null) {
       throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
     }
-    val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
     blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
       status.map { s => (blockManagerId, s) }
     }.toMap
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index fe43fc4125c8e328d36aa69f5952116d24c668ee..b8b12be8756f90b2b2e0896f939990b26a5007d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -78,5 +78,5 @@ case class BroadcastHashJoin(
 object BroadcastHashJoin {
 
   private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024))
+    ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 4943f29395d12fbf3d832b0057bdc757f0db1997..33be067ebdaf25c870b7bd01103e532d41126e81 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -18,14 +18,14 @@
 package org.apache.spark.streaming.receiver
 
 import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StreamBlockId
-import java.util.concurrent.CountDownLatch
-import scala.concurrent._
-import ExecutionContext.Implicits.global
+import org.apache.spark.util.ThreadUtils
 
 /**
  * Abstract class that is responsible for supervising a Receiver in the worker.
@@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor(
   // Attach the executor to the receiver
   receiver.attachExecutor(this)
 
+  private val futureExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))
+
   /** Receiver id */
   protected val streamId = receiver.streamId
 
@@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor(
     stoppingError = error.orNull
     stopReceiver(message, error)
     onStop(message, error)
+    futureExecutionContext.shutdownNow()
     stopLatch.countDown()
   }
 
@@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor(
   /** Restart receiver with delay */
   def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
     Future {
+      // This is a blocking action so we should use "futureExecutionContext" which is a cached
+      // thread pool.
       logWarning("Restarting receiver with delay " + delay + " ms: " + message,
         error.getOrElse(null))
       stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
@@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor(
       logInfo("Starting receiver again")
       startReceiver()
       logInfo("Receiver started again")
-    }
+    }(futureExecutionContext)
   }
 
   /** Check if receiver has been marked for stopping */