From 7c07d176f3d65235f9376898a7b10b01268c867c Mon Sep 17 00:00:00 2001
From: Pete Robbins <robbinspg@gmail.com>
Date: Thu, 2 Jun 2016 10:14:51 -0700
Subject: [PATCH] [SPARK-15606][CORE] Use non-blocking removeExecutor call to
 avoid deadlocks

## What changes were proposed in this pull request?
Set minimum number of dispatcher threads to 3 to avoid deadlocks on machines with only 2 cores

## How was this patch tested?

Spark test builds

Author: Pete Robbins <robbinspg@gmail.com>

Closes #13355 from robbinspg/SPARK-13906.
---
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +-
 .../org/apache/spark/storage/BlockManagerMaster.scala     | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0fea9c123b..e84cb6346d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           // manager to reregister itself. If that happens, the block manager master will know
           // about the executor, but the scheduler will not. Therefore, we should remove the
           // executor from the block manager when we hit this case.
-          scheduler.sc.env.blockManager.master.removeExecutor(executorId)
+          scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
           logInfo(s"Asked to remove non-existent executor $executorId")
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 52db45bd48..8655cf10fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -42,6 +42,14 @@ class BlockManagerMaster(
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
+  /** Request removal of a dead executor from the driver endpoint.
+   *  This is only called on the driver side. Non-blocking
+   */
+  def removeExecutorAsync(execId: String) {
+    driverEndpoint.ask[Boolean](RemoveExecutor(execId))
+    logInfo("Removal of executor " + execId + " requested")
+  }
+
   /** Register the BlockManager's id with the driver. */
   def registerBlockManager(
       blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
-- 
GitLab