From 14fcef72db765d0313d4ce3c986c08069a1a01ae Mon Sep 17 00:00:00 2001 From: liguoqiang <liguoqiang@rd.tuan800.com> Date: Thu, 26 Dec 2013 11:05:07 +0800 Subject: [PATCH] Renamed ClusterScheduler to TaskSchedulerImpl for yarn and new-yarn --- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 9 +++++---- .../scheduler/cluster/YarnClientClusterScheduler.scala | 3 ++- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 3 ++- .../spark/scheduler/cluster/YarnClusterScheduler.scala | 3 ++- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 9 +++++---- .../scheduler/cluster/YarnClientClusterScheduler.scala | 3 ++- .../scheduler/cluster/YarnClientSchedulerBackend.scala | 3 ++- .../spark/scheduler/cluster/YarnClusterScheduler.scala | 4 ++-- 8 files changed, 22 insertions(+), 15 deletions(-) diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index c27257cda4..96a24cd2b1 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.Logging import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration @@ -233,9 +234,9 @@ private[yarn] class YarnAllocationHandler( // Note that the list we create below tries to ensure that not all containers end up within // a host if there is a sufficiently large number of hosts/containers. val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(rackLocalContainers) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(offRackContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // Run each of the allocated containers. for (container <- allocatedContainersToProcess) { diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 63a0449e5a..40307ab972 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -21,12 +21,13 @@ import org.apache.spark._ import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnAllocationHandler import org.apache.spark.util.Utils +import org.apache.spark.scheduler.TaskSchedulerImpl /** * * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index b206780c78..350fc760a4 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 29b3f22e13..b318270f75 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,12 +21,13 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration +import org.apache.spark.scheduler.TaskSchedulerImpl /** * * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 9ab2073529..eeee78f8ad 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.Logging import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration @@ -214,9 +215,9 @@ private[yarn] class YarnAllocationHandler( // host if there are sufficiently large number of hosts/containers. val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(rackLocalContainers) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(offRackContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // Run each of the allocated containers for (container <- allocatedContainers) { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 63a0449e5a..522e0a9ad7 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -20,13 +20,14 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnAllocationHandler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** * * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index b206780c78..350fc760a4 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 4e988b8017..2d9fbcb400 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} -import org.apache.spark.scheduler.ClusterScheduler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** @@ -30,7 +30,7 @@ import org.apache.spark.util.Utils * ApplicationMaster, etc. is done */ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) - extends ClusterScheduler(sc) { + extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") -- GitLab