Skip to content
Snippets Groups Projects
Commit ee96e940 authored by Colin Patrick McCabe's avatar Colin Patrick McCabe Committed by Patrick Wendell
Browse files

SPARK-1898: In deploy.yarn.Client, use YarnClient not YarnClientImpl

https://issues.apache.org/jira/browse/SPARK-1898

Author: Colin Patrick McCabe <cmccabe@cloudera.com>

Closes #850 from cmccabe/master and squashes the following commits:

d66eddc [Colin Patrick McCabe] SPARK-1898: In deploy.yarn.Client, use YarnClient rather than YarnClientImpl
parent a338834f
No related branches found
No related tags found
No related merge requests found
......@@ -112,7 +112,7 @@ private[spark] class YarnClientSchedulerBackend(
override def stop() {
super.stop()
client.stop()
client.stop
logInfo("Stopped")
}
......
......@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
......@@ -37,7 +37,9 @@ import org.apache.spark.{Logging, SparkConf}
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
*/
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
extends ClientBase with Logging {
val yarnClient = YarnClient.createYarnClient
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, new Configuration(), spConf)
......@@ -53,8 +55,8 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def runApp(): ApplicationId = {
validateArgs()
// Initialize and start the client service.
init(yarnConf)
start()
yarnClient.init(yarnConf)
yarnClient.start()
// Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
logClusterResourceDetails()
......@@ -63,7 +65,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// interface).
// Get a new client application.
val newApp = super.createApplication()
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
......@@ -99,11 +101,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
}
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
......@@ -132,15 +134,20 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def submitApp(appContext: ApplicationSubmissionContext) = {
// Submit the application to the applications manager.
logInfo("Submitting application to ASM")
super.submitApplication(appContext)
yarnClient.submitApplication(appContext)
}
def getApplicationReport(appId: ApplicationId) =
yarnClient.getApplicationReport(appId)
def stop = yarnClient.stop
def monitorApplication(appId: ApplicationId): Boolean = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
while (true) {
Thread.sleep(interval)
val report = super.getApplicationReport(appId)
val report = yarnClient.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment