diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index c27257cda4e55c361b4e160da670f9afa664d87c..96a24cd2b1906ae399a19c899e0cdce7b65925c6 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.Logging import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration @@ -233,9 +234,9 @@ private[yarn] class YarnAllocationHandler( // Note that the list we create below tries to ensure that not all containers end up within // a host if there is a sufficiently large number of hosts/containers. val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(rackLocalContainers) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(offRackContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // Run each of the allocated containers. for (container <- allocatedContainersToProcess) { diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 63a0449e5a0730085554d2b8ae86067135fa8dba..40307ab97217369299c6a654c44dc56233147916 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -21,12 +21,13 @@ import org.apache.spark._ import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnAllocationHandler import org.apache.spark.util.Utils +import org.apache.spark.scheduler.TaskSchedulerImpl /** * * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index b206780c7806e15c84944db05876f89c8f848040..350fc760a428a5733eeeadf474b341f9a2341ccb 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 29b3f22e13697b38bc501e2f914d8fc0a202d722..b318270f750a89de7c05b96d896ce5a6994e30ae 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,12 +21,13 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration +import org.apache.spark.scheduler.TaskSchedulerImpl /** * * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 9ab20735299c0e35a1e1c13fcd11485ee6ee87d0..eeee78f8ad8b3e2edc4fd9eb2f956ddd7a243c01 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.Logging import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration @@ -214,9 +215,9 @@ private[yarn] class YarnAllocationHandler( // host if there are sufficiently large number of hosts/containers. val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(rackLocalContainers) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(offRackContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // Run each of the allocated containers for (container <- allocatedContainers) { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 63a0449e5a0730085554d2b8ae86067135fa8dba..522e0a9ad7eeb50f4c2b6b781a68ea998639b30a 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -20,13 +20,14 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnAllocationHandler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** * * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index b206780c7806e15c84944db05876f89c8f848040..350fc760a428a5733eeeadf474b341f9a2341ccb 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 4e988b8017600e09b3300fe1a3ec36e82025a73d..2d9fbcb400e5bc07e1af665116910909d7edd118 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} -import org.apache.spark.scheduler.ClusterScheduler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** @@ -30,7 +30,7 @@ import org.apache.spark.util.Utils * ApplicationMaster, etc. is done */ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) - extends ClusterScheduler(sc) { + extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler")