Skip to content
Snippets Groups Projects
Commit bd7b91ce authored by zhonghaihua's avatar zhonghaihua Committed by Tom Graves
Browse files

[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…

Currently, when max number of executor failures reached the `maxNumExecutorFailures`, `ApplicationMaster` will be killed and re-register another one.This time, `YarnAllocator` will be created a new instance.
But, the value of property `executorIdCounter` in `YarnAllocator` will reset to `0`. Then the Id of new executor will starting from `1`. This will confuse with the executor has already created before, which will cause FetchFailedException.
This situation is just in yarn client mode, so this is an issue in yarn client mode. For more details, [link to jira issues SPARK-12864](https://issues.apache.org/jira/browse/SPARK-12864)
This PR introduce a mechanism to initialize `executorIdCounter` after `ApplicationMaster` killed.

Author: zhonghaihua <793507405@qq.com>

Closes #10794 from zhonghaihua/initExecutorIdCounterAfterAMKilled.
parent 3e991dbc
No related branches found
No related tags found
No related merge requests found
...@@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages { ...@@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {
case object RetrieveSparkProps extends CoarseGrainedClusterMessage case object RetrieveSparkProps extends CoarseGrainedClusterMessage
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
// Driver to executors // Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
......
...@@ -79,6 +79,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -79,6 +79,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason. // Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String] protected val executorsPendingLossReason = new HashSet[String]
// The num of current max ExecutorId used to re-register appMaster
protected var currentExecutorIdCounter = 0
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging { extends ThreadSafeRpcEndpoint with Logging {
...@@ -156,6 +159,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -156,6 +159,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors // in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized { CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data) executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) { if (numPendingExecutors > 0) {
numPendingExecutors -= 1 numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
......
...@@ -40,6 +40,7 @@ import org.apache.spark.internal.config._ ...@@ -40,6 +40,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.ThreadUtils import org.apache.spark.util.ThreadUtils
/** /**
...@@ -83,8 +84,23 @@ private[yarn] class YarnAllocator( ...@@ -83,8 +84,23 @@ private[yarn] class YarnAllocator(
new ConcurrentHashMap[ContainerId, java.lang.Boolean]) new ConcurrentHashMap[ContainerId, java.lang.Boolean])
@volatile private var numExecutorsRunning = 0 @volatile private var numExecutorsRunning = 0
// Used to generate a unique ID per executor
private var executorIdCounter = 0 /**
* Used to generate a unique ID per executor
*
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
* the id of new executor will start from 1, this will conflict with the executor has
* already created before. So, we should initialize the `executorIdCounter` by getting
* the max executorId from driver.
*
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
* in yarn client mode. For more details, can check in jira.
*
* @see SPARK-12864
*/
private var executorIdCounter: Int =
driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
@volatile private var numExecutorsFailed = 0 @volatile private var numExecutorsFailed = 0
@volatile private var targetNumExecutors = @volatile private var targetNumExecutors =
......
...@@ -292,6 +292,9 @@ private[spark] abstract class YarnSchedulerBackend( ...@@ -292,6 +292,9 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!") logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false) context.reply(false)
} }
case RetrieveLastAllocatedExecutorId =>
context.reply(currentExecutorIdCounter)
} }
override def onDisconnected(remoteAddress: RpcAddress): Unit = { override def onDisconnected(remoteAddress: RpcAddress): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment