Skip to content
Snippets Groups Projects
Commit 8b44bd52 authored by jerryshao's avatar jerryshao Committed by Tom Graves
Browse files

[SPARK-6735][YARN] Add window based executor failure tracking mechanism for long running service

This work is based on twinkle-sachdeva 's proposal. In parallel to such mechanism for AM failures, here add similar mechanism for executor failure tracking, this is useful for long running Spark service to mitigate the executor failure problems.

Please help to review, tgravescs sryza and vanzin

Author: jerryshao <sshao@hortonworks.com>

Closes #10241 from jerryshao/SPARK-6735.
parent 9e785079
No related branches found
No related tags found
No related merge requests found
......@@ -371,6 +371,14 @@ If you need a reference to the proper location to put log files in the YARN so t
This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.failuresValidityInterval</code></td>
<td>(none)</td>
<td>
Defines the validity interval for executor failure tracking.
Executor failures which are older than the validity interval will be ignored.
</td>
</tr>
<tr>
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
<td><code>true</code></td>
......
......@@ -231,14 +231,14 @@ private[spark] class Client(
"Cluster's default value will be used.")
}
sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
try {
val method = appContext.getClass().getMethod(
"setAttemptFailuresValidityInterval", classOf[Long])
method.invoke(appContext, interval: java.lang.Long)
} catch {
case e: NoSuchMethodException =>
logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
"the version of YARN does not support it")
}
}
......
......@@ -22,7 +22,7 @@ import java.util.concurrent._
import java.util.regex.Pattern
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
......@@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
......@@ -102,7 +102,13 @@ private[yarn] class YarnAllocator(
private var executorIdCounter: Int =
driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
@volatile private var numExecutorsFailed = 0
// Queue to store the timestamp of failed executors
private val failedExecutorsTimeStamps = new Queue[Long]()
private var clock: Clock = new SystemClock
private val executorFailuresValidityInterval =
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
@volatile private var targetNumExecutors =
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
......@@ -166,9 +172,26 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
/**
* Use a different clock for YarnAllocator. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}
def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsFailed: Int = numExecutorsFailed
def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStamps.nonEmpty
&& failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
failedExecutorsTimeStamps.dequeue()
}
failedExecutorsTimeStamps.size
}
/**
* A sequence of pending container requests that have not yet been fulfilled.
......@@ -527,7 +550,8 @@ private[yarn] class YarnAllocator(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
numExecutorsFailed += 1
// Enqueue the timestamp of failed executor
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
......
......@@ -33,13 +33,20 @@ package object config {
.toSequence
.createOptional
private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
.doc("Interval after which AM failures will be considered independent and " +
"not accumulate towards the attempt count.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +
"accumulate towards the attempt count.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
.doc("Maximum number of AM attempts before failing the app.")
.intConf
......
......@@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.ManualClock
class MockResolver extends DNSToSwitchMapping {
......@@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
test("window based failure executor counting") {
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
val handler = createAllocator(4)
val clock = new ManualClock(0L)
handler.setClock(clock)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
val containers = Seq(
createContainer("host1"),
createContainer("host2"),
createContainer("host3"),
createContainer("host4")
)
handler.handleAllocatedContainers(containers)
val failedStatuses = containers.map { c =>
ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
}
handler.getNumExecutorsFailed should be (0)
clock.advance(100 * 1000L)
handler.processCompletedContainers(failedStatuses.slice(0, 1))
handler.getNumExecutorsFailed should be (1)
clock.advance(101 * 1000L)
handler.getNumExecutorsFailed should be (0)
handler.processCompletedContainers(failedStatuses.slice(1, 3))
handler.getNumExecutorsFailed should be (2)
clock.advance(50 * 1000L)
handler.processCompletedContainers(failedStatuses.slice(3, 4))
handler.getNumExecutorsFailed should be (3)
clock.advance(51 * 1000L)
handler.getNumExecutorsFailed should be (1)
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment