From ee96e9406613e621837360b15c34ea7c7220a7a3 Mon Sep 17 00:00:00 2001
From: Colin Patrick McCabe <cmccabe@cloudera.com>
Date: Sun, 8 Jun 2014 12:27:34 -0700
Subject: [PATCH] SPARK-1898: In deploy.yarn.Client, use YarnClient not
 YarnClientImpl

https://issues.apache.org/jira/browse/SPARK-1898

Author: Colin Patrick McCabe <cmccabe@cloudera.com>

Closes #850 from cmccabe/master and squashes the following commits:

d66eddc [Colin Patrick McCabe] SPARK-1898: In deploy.yarn.Client, use YarnClient rather than YarnClientImpl
---
 .../cluster/YarnClientSchedulerBackend.scala  |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala | 25 ++++++++++++-------
 2 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e01ed5a57d..039cf4f276 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -112,7 +112,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   override def stop() {
     super.stop()
-    client.stop()
+    client.stop
     logInfo("Stopped")
   }
 
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1b6bfb42a5..393edd1f2d 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
+import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, Records}
@@ -37,7 +37,9 @@ import org.apache.spark.{Logging, SparkConf}
  * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
  */
 class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
-  extends YarnClientImpl with ClientBase with Logging {
+  extends ClientBase with Logging {
+
+  val yarnClient = YarnClient.createYarnClient
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
     this(clientArgs, new Configuration(), spConf)
@@ -53,8 +55,8 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
   def runApp(): ApplicationId = {
     validateArgs()
     // Initialize and start the client service.
-    init(yarnConf)
-    start()
+    yarnClient.init(yarnConf)
+    yarnClient.start()
 
     // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
     logClusterResourceDetails()
@@ -63,7 +65,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
     // interface).
 
     // Get a new client application.
-    val newApp = super.createApplication()
+    val newApp = yarnClient.createApplication()
     val newAppResponse = newApp.getNewApplicationResponse()
     val appId = newAppResponse.getApplicationId()
 
@@ -99,11 +101,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
   }
 
   def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+    val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
     logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
       clusterMetrics.getNumNodeManagers)
 
-    val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
+    val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
     logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
         queueInfo.getQueueName,
@@ -132,15 +134,20 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
   def submitApp(appContext: ApplicationSubmissionContext) = {
     // Submit the application to the applications manager.
     logInfo("Submitting application to ASM")
-    super.submitApplication(appContext)
+    yarnClient.submitApplication(appContext)
   }
 
+  def getApplicationReport(appId: ApplicationId) =
+      yarnClient.getApplicationReport(appId)
+
+  def stop = yarnClient.stop
+
   def monitorApplication(appId: ApplicationId): Boolean = {
     val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
 
     while (true) {
       Thread.sleep(interval)
-      val report = super.getApplicationReport(appId)
+      val report = yarnClient.getApplicationReport(appId)
 
       logInfo("Application report from ASM: \n" +
         "\t application identifier: " + appId.toString() + "\n" +
-- 
GitLab