diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5a1b42c1e17d5c65f867e3f2c19676ae96a05b5b..6c93d8582330bf156e1bf3e652ca57f4c5444b7b 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler( private val lastResponseId = new AtomicInteger() private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList() - override protected def allocateContainers(count: Int): YarnAllocateResponse = { + override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { var resourceRequests: List[ResourceRequest] = null - logDebug("numExecutors: " + count) + logDebug("asking for additional executors: " + count + " with already pending: " + pending) + val totalNumAsk = count + pending if (count <= 0) { resourceRequests = List() } else if (preferredHostToCount.isEmpty) { logDebug("host preferences is empty") resourceRequests = List(createResourceRequest( - AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) + AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) } else { // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. @@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler( val anyContainerRequests: ResourceRequest = createResourceRequest( AllocationType.ANY, resource = null, - count, + totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest]( @@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler( req.addAllReleases(releasedContainerList) if (count > 0) { - logInfo("Allocating %d executor containers with %d of memory each.".format(count, + logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk, executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b8744f4b8bdf420726615996a927b8e17edb36b..299e38a5eb9c0218c670a7fa6db03b739b66f5c7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator( def allocateResources() = { val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() + // this is needed by alpha, do it here since we add numPending right after this + val executorsPending = numPendingAllocate.get() + if (missing > 0) { numPendingAllocate.addAndGet(missing) logInfo("Will Allocate %d executor containers, each with %d memory".format( @@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator( logDebug("Empty allocation request ...") } - val allocateResponse = allocateContainers(missing) + val allocateResponse = allocateContainers(missing, executorsPending) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { @@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator( * * @param count Number of containers to allocate. * If zero, should still contact RM (as a heartbeat). + * @param pending Number of containers pending allocate. Only used on alpha. * @return Response to the allocation request. */ - protected def allocateContainers(count: Int): YarnAllocateResponse + protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse /** Called to release a previously allocated container. */ protected def releaseContainer(container: Container): Unit diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5438f151ac0adda5d74e3ac6e12dc6385a8d57a7..e44a8db41b97e9a49dab01e3187bb42e559acea5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -47,7 +47,8 @@ private[yarn] class YarnAllocationHandler( amClient.releaseAssignedContainer(container.getId()) } - override protected def allocateContainers(count: Int): YarnAllocateResponse = { + // pending isn't used on stable as the AMRMClient handles incremental asks + override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { addResourceRequests(count) // We have already set the container request. Poll the ResourceManager for a response.