diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ed77a6e4a1c7c10a0499b9f4142bb226abf6e75a..cc571c330f8d876b7fc4139d63f0b037320054c0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.util.Collections import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import scala.collection.mutable @@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.log4j.{Level, Logger} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ @@ -80,7 +80,9 @@ private[yarn] class YarnAllocator( private val releasedContainers = Collections.newSetFromMap[ContainerId]( new ConcurrentHashMap[ContainerId, java.lang.Boolean]) - @volatile private var numExecutorsRunning = 0 + private val numExecutorsRunning = new AtomicInteger(0) + + private val numExecutorsStarting = new AtomicInteger(0) /** * Used to generate a unique ID per executor @@ -163,7 +165,7 @@ private[yarn] class YarnAllocator( clock = newClock } - def getNumExecutorsRunning: Int = numExecutorsRunning + def getNumExecutorsRunning: Int = numExecutorsRunning.get() def getNumExecutorsFailed: Int = synchronized { val endTime = clock.getTimeMillis() @@ -242,7 +244,7 @@ private[yarn] class YarnAllocator( if (executorIdToContainer.contains(executorId)) { val container = executorIdToContainer.get(executorId).get internalReleaseContainer(container) - numExecutorsRunning -= 1 + numExecutorsRunning.decrementAndGet() } else { logWarning(s"Attempted to kill unknown executor $executorId!") } @@ -267,10 +269,12 @@ private[yarn] class YarnAllocator( val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { - logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s." + logDebug(("Allocated containers: %d. Current executor count: %d. " + + "Launching executor count: %d. Cluster resources: %s.") .format( allocatedContainers.size, - numExecutorsRunning, + numExecutorsRunning.get, + numExecutorsStarting.get, allocateResponse.getAvailableResources)) handleAllocatedContainers(allocatedContainers.asScala) @@ -281,7 +285,7 @@ private[yarn] class YarnAllocator( logDebug("Completed %d containers".format(completedContainers.size)) processCompletedContainers(completedContainers.asScala) logDebug("Finished processing %d completed containers. Current running executor count: %d." - .format(completedContainers.size, numExecutorsRunning)) + .format(completedContainers.size, numExecutorsRunning.get)) } } @@ -294,7 +298,11 @@ private[yarn] class YarnAllocator( def updateResourceRequests(): Unit = { val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + val missing = targetNumExecutors - numPendingAllocate - + numExecutorsStarting.get - numExecutorsRunning.get + logDebug(s"Updating resource requests, target: $targetNumExecutors, " + + s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " + + s"executorsStarting: ${numExecutorsStarting.get}") if (missing > 0) { logInfo(s"Will request $missing executor container(s), each with " + @@ -493,7 +501,8 @@ private[yarn] class YarnAllocator( s"for executor with ID $executorId") def updateInternalState(): Unit = synchronized { - numExecutorsRunning += 1 + numExecutorsRunning.incrementAndGet() + numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId @@ -503,7 +512,8 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.put(containerId, executorHostname) } - if (numExecutorsRunning < targetNumExecutors) { + if (numExecutorsRunning.get < targetNumExecutors) { + numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(new Runnable { override def run(): Unit = { @@ -523,11 +533,16 @@ private[yarn] class YarnAllocator( ).run() updateInternalState() } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) + case e: Throwable => + numExecutorsStarting.decrementAndGet() + if (NonFatal(e)) { + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately + // to avoid unnecessary resource occupation. + amClient.releaseAssignedContainer(containerId) + } else { + throw e + } } } }) @@ -537,7 +552,8 @@ private[yarn] class YarnAllocator( } } else { logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " + - "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors)) + "reached target Executors count: %d.").format( + numExecutorsRunning.get, targetNumExecutors)) } } } @@ -552,7 +568,7 @@ private[yarn] class YarnAllocator( val exitReason = if (!alreadyReleased) { // Decrement the number of executors running. The next iteration of // the ApplicationMaster's reporting thread will take care of allocating. - numExecutorsRunning -= 1 + numExecutorsRunning.decrementAndGet() logInfo("Completed container %s%s (state: %s, exit status: %s)".format( containerId, onHostStr,