diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9f0f6074229ddcce8dcffeed5388a5763edb7092..25a3d609a6b094a701b403107904ff707ecd386f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1757,8 +1757,26 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   def listJars(): Seq[String] = addedJars.keySet.toSeq
 
-  // Shut down the SparkContext.
-  def stop() {
+  /**
+   * Shut down the SparkContext.
+   */
+  def stop(): Unit = {
+    if (env.rpcEnv.isInRPCThread) {
+      // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread.
+      // We should launch a new thread to call `stop` to avoid dead-lock.
+      new Thread("stop-spark-context") {
+        setDaemon(true)
+
+        override def run(): Unit = {
+          _stop()
+        }
+      }.start()
+    } else {
+      _stop()
+    }
+  }
+
+  private def _stop() {
     if (LiveListenerBus.withinListenerThread.value) {
       throw new SparkException(
         s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 579122868afc8d42119a4a0c1f6148c86d8e8f3d..bbc416381490bc3d658b89fbf5c3570d96d3f098 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -147,6 +147,10 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
    */
   def openChannel(uri: String): ReadableByteChannel
 
+  /**
+   * Return if the current thread is a RPC thread.
+   */
+  def isInRPCThread: Boolean
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index a02cf30a5d8317c65123019a93344ad2ab29c0bc..67baabd2cbff26f41255d2c16d9058308bdd4bd7 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -201,6 +201,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
   /** Message loop used for dispatching messages. */
   private class MessageLoop extends Runnable {
     override def run(): Unit = {
+      NettyRpcEnv.rpcThreadFlag.value = true
       try {
         while (true) {
           try {
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e51649a1ecce9b01ea070fd5b3e709807f12260c..0b8cd144a2161a9273fa580faeae26bc1a08a5fc 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -408,10 +408,13 @@ private[netty] class NettyRpcEnv(
 
   }
 
+  override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value
 }
 
 private[netty] object NettyRpcEnv extends Logging {
 
+  private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false)
+
   /**
    * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]].
    * Use `currentEnv` to wrap the deserialization codes. E.g.,
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index acdf21df9a16157b3f1be95945fb3a94a6ffdead..aa0705987d837225653b2987f06dfc2605f05c82 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -870,6 +870,19 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     verify(endpoint, never()).onDisconnected(any())
     verify(endpoint, never()).onNetworkError(any(), any())
   }
+
+  test("isInRPCThread") {
+    val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint {
+      override val rpcEnv = env
+
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+        case m => context.reply(rpcEnv.isInRPCThread)
+      }
+    })
+    assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true)
+    assert(env.isInRPCThread === false)
+    env.stop(rpcEndpointRef)
+  }
 }
 
 class UnserializableClass