From 8faf5c51c3ea0b3ad83418552b50db596fefc558 Mon Sep 17 00:00:00 2001
From: Mridul Muralidharan <mridul@gmail.com>
Date: Wed, 24 Apr 2013 02:31:57 +0530
Subject: [PATCH] Patch from Thomas Graves to improve the YARN Client, and move
 to more production ready hadoop yarn branch

---
 core/pom.xml                                  |  5 ++
 .../scala/spark/deploy/yarn/Client.scala      | 72 +++----------------
 pom.xml                                       |  9 ++-
 project/SparkBuild.scala                      |  5 +-
 repl-bin/pom.xml                              |  5 ++
 5 files changed, 30 insertions(+), 66 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 9baa447662..7f65ce5c00 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -297,6 +297,11 @@
           <artifactId>hadoop-yarn-common</artifactId>
           <scope>provided</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-client</artifactId>
+          <scope>provided</scope>
+        </dependency>
       </dependencies>
       <build>
         <plugins>
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index c007dae98c..7a881e26df 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -7,6 +7,7 @@ import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.YarnClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import scala.collection.mutable.HashMap
@@ -16,19 +17,19 @@ import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import spark.deploy.SparkHadoopUtil
 
-class Client(conf: Configuration, args: ClientArguments) extends Logging {
+class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
   
   def this(args: ClientArguments) = this(new Configuration(), args)
   
-  var applicationsManager: ClientRMProtocol = null
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   
   def run() {
-    connectToASM()
+    init(yarnConf)
+    start()
     logClusterResourceDetails()
 
-    val newApp = getNewApplication()
+    val newApp = super.getNewApplication()
     val appId = newApp.getApplicationId()
 
     verifyClusterResources(newApp)
@@ -47,64 +48,17 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
     System.exit(0)
   }
   
-  
-  def connectToASM() {
-    val rmAddress: InetSocketAddress = NetUtils.createSocketAddr(
-      yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)
-    )
-    logInfo("Connecting to ResourceManager at" + rmAddress)
-    applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf)
-      .asInstanceOf[ClientRMProtocol]
-  }
 
   def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = getYarnClusterMetrics
+    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
     logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
 
-/*
-    val clusterNodeReports: List[NodeReport] = getNodeReports
-    logDebug("Got Cluster node info from ASM")
-    for (node <- clusterNodeReports) {
-      logDebug("Got node report from ASM for, nodeId=" + node.getNodeId + ", nodeAddress=" + node.getHttpAddress +
-        ", nodeRackName=" + node.getRackName + ", nodeNumContainers=" + node.getNumContainers + ", nodeHealthStatus=" + node.getNodeHealthStatus)
-    }
-*/
-
-    val queueInfo: QueueInfo = getQueueInfo(args.amQueue)
+    val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
     logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
       ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
       ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
   }
 
-  def getYarnClusterMetrics: YarnClusterMetrics = {
-    val request: GetClusterMetricsRequest = Records.newRecord(classOf[GetClusterMetricsRequest])
-    val response: GetClusterMetricsResponse = applicationsManager.getClusterMetrics(request)
-    return response.getClusterMetrics
-  }
-
-  def getNodeReports: List[NodeReport] = {
-    val request: GetClusterNodesRequest = Records.newRecord(classOf[GetClusterNodesRequest])
-    val response: GetClusterNodesResponse = applicationsManager.getClusterNodes(request)
-    return response.getNodeReports.toList
-  }
-
-  def getQueueInfo(queueName: String): QueueInfo = {
-    val request: GetQueueInfoRequest = Records.newRecord(classOf[GetQueueInfoRequest])
-    request.setQueueName(queueName)
-    request.setIncludeApplications(true)
-    request.setIncludeChildQueues(false)
-    request.setRecursive(false)
-    Records.newRecord(classOf[GetQueueInfoRequest])
-    return applicationsManager.getQueueInfo(request).getQueueInfo
-  }
-
-  def getNewApplication(): GetNewApplicationResponse = {
-    logInfo("Requesting new Application")
-    val request = Records.newRecord(classOf[GetNewApplicationRequest])
-    val response = applicationsManager.getNewApplication(request)
-    logInfo("Got new ApplicationId: " + response.getApplicationId())
-    return response
-  }
   
   def verifyClusterResources(app: GetNewApplicationResponse) = { 
     val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -265,23 +219,15 @@ class Client(conf: Configuration, args: ClientArguments) extends Logging {
   }
   
   def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Create the request to send to the applications manager 
-    val appRequest = Records.newRecord(classOf[SubmitApplicationRequest])
-      .asInstanceOf[SubmitApplicationRequest]
-    appRequest.setApplicationSubmissionContext(appContext)
     // Submit the application to the applications manager
     logInfo("Submitting application to ASM")
-    applicationsManager.submitApplication(appRequest)
+    super.submitApplication(appContext)
   }
   
   def monitorApplication(appId: ApplicationId): Boolean = {  
     while(true) {
       Thread.sleep(1000)
-      val reportRequest = Records.newRecord(classOf[GetApplicationReportRequest])
-        .asInstanceOf[GetApplicationReportRequest]
-      reportRequest.setApplicationId(appId)
-      val reportResponse = applicationsManager.getApplicationReport(reportRequest)
-      val report = reportResponse.getApplicationReport()
+      val report = super.getApplicationReport(appId)
 
       logInfo("Application report from ASM: \n" +
         "\t application identifier: " + appId.toString() + "\n" +
diff --git a/pom.xml b/pom.xml
index ecbfaf9b47..0e95520d50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -564,7 +564,9 @@
       <id>hadoop2-yarn</id>
       <properties>
         <hadoop.major.version>2</hadoop.major.version>
-        <yarn.version>2.0.2-alpha</yarn.version>
+        <!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
+        <yarn.version>0.23.7</yarn.version>
+        <!-- <yarn.version>2.0.2-alpha</yarn.version> -->
       </properties>
 
       <repositories>
@@ -599,6 +601,11 @@
             <artifactId>hadoop-yarn-common</artifactId>
             <version>${yarn.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${yarn.version}</version>
+          </dependency>
           <!-- Specify Avro version because Kafka also has it as a dependency -->
           <dependency>
             <groupId>org.apache.avro</groupId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 0a5b89d927..819e940403 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -20,7 +20,7 @@ object SparkBuild extends Build {
   //val HADOOP_YARN = false
 
   // For Hadoop 2 YARN support
-  val HADOOP_VERSION = "2.0.2-alpha"
+  val HADOOP_VERSION = "0.23.7"
   val HADOOP_MAJOR_VERSION = "2"
   val HADOOP_YARN = true
 
@@ -156,7 +156,8 @@ object SparkBuild extends Build {
           Seq(
             "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
             "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION,
-            "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION
+            "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION,
+            "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION
           )
         } else {
           Seq(
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index b66d193b5d..46f38c2772 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -201,6 +201,11 @@
           <artifactId>hadoop-yarn-common</artifactId>
           <scope>runtime</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-client</artifactId>
+          <scope>runtime</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
-- 
GitLab