Skip to content
Snippets Groups Projects
Commit a98f5a0e authored by Harvey Feng's avatar Harvey Feng
Browse files

Misc style changes in the 'yarn' package.

parent e2ebc3a9
No related branches found
No related tags found
No related merge requests found
......@@ -21,9 +21,13 @@ import java.io.IOException
import java.net.Socket
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
......@@ -31,35 +35,36 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
import scala.collection.JavaConversions._
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
def this(args: ApplicationMasterArguments) = this(args, new Configuration())
private var rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = null
private var appAttemptId: ApplicationAttemptId = null
private var userThread: Thread = null
private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = _
private var userThread: Thread = _
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private val fs = FileSystem.get(yarnConf)
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
private var uiAddress: String = ""
private var yarnAllocator: YarnAllocationHandler = _
private var isFinished: Boolean = false
private var uiAddress: String = _
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
def run() {
// setup the directories so things go to yarn approved directories rather
// then user specified and /tmp
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())
// use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
......@@ -68,9 +73,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
// ignore result
// ignore result.
// This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
// Hence args.workerCores = numCore disabled above. Any better option ?
// Hence args.workerCores = numCore disabled above. Any better option?
// Compute number of threads for akka
//val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
......@@ -96,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
waitForSparkContextInitialized()
// do this after spark master is up and SparkContext is created so that we can register UI Url
// Do this after spark master is up and SparkContext is created so that we can register UI Url
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
......@@ -115,12 +120,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
.getOrElse(""))
.getOrElse(""))
if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
}
return localDirs
localDirs
}
private def getApplicationAttemptId(): ApplicationAttemptId = {
......@@ -129,7 +134,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val containerId = ConverterUtils.toContainerId(containerIdString)
val appAttemptId = containerId.getApplicationAttemptId()
logInfo("ApplicationAttemptId: " + appAttemptId)
return appAttemptId
appAttemptId
}
private def registerWithResourceManager(): AMRMProtocol = {
......@@ -137,7 +142,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
logInfo("Connecting to ResourceManager at " + rmAddress)
return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
......@@ -145,12 +150,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
// Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
// Setting this to master host,port - so that the ApplicationReport at client has some
// sensible info.
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
resourceManager.registerApplicationMaster(appMasterRequest)
}
private def waitForSparkMaster() {
......@@ -164,21 +170,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
logInfo("Driver now available: " + driverHost + ":" + driverPort)
logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
driverUp = true
} catch {
case e: Exception =>
logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
Thread.sleep(100)
tries = tries + 1
case e: Exception => {
logWarning("Failed to connect to driver at %s:%s, retrying ...").
format(driverHost, driverPort)
Thread.sleep(100)
tries = tries + 1
}
}
}
}
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val mainMethod = Class.forName(
args.userClass,
false /* initialize */,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
var successed = false
......@@ -203,7 +213,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
t.start()
return t
t
}
// this need to happen before allocateWorkers
......@@ -225,12 +235,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
sparkContext.preferredNodeLocationData)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
resourceManager,
appAttemptId,
args,
sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
", numTries = " + numTries)
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
format(count * waitTime, numTries))
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
resourceManager,
appAttemptId,
args)
}
}
} finally {
......@@ -246,35 +264,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit !
userThread.isAlive) {
this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
// Exists the loop if the user thread exits.
while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
val numContainersToAllocate = math.max(
args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)
this.yarnAllocator.allocateContainers(numContainersToAllocate)
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
} finally {
// in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
// so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All workers have launched.")
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
// TODO(harvey): Verify the timeout
if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
// Must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many
// requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher.
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
launchReporterThread(interval)
}
}
// TODO: We might want to extend this to allocate more containers in case they die !
// TODO: We might want to extend this to allocate more containers in case they die.
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
......@@ -283,7 +303,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingWorkerCount))
yarnAllocator.allocateContainers(missingWorkerCount)
}
else sendProgress()
......@@ -291,16 +312,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
// setting to daemon status, though this is usually not a good idea.
// Setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.start()
logInfo("Started progress reporter thread - sleep time : " + sleepTime)
return t
t
}
private def sendProgress() {
logDebug("Sending progress")
// simulated with an allocate request with no nodes requested ...
// Simulated with an allocate request with no nodes requested ...
yarnAllocator.allocateContainers(0)
}
......@@ -320,7 +341,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
*/
def finishApplicationMaster(status: FinalApplicationStatus) {
synchronized {
if (isFinished) {
return
......@@ -333,14 +353,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
// set tracking url to empty since we don't have a history server
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
/**
* clean up the staging directory.
* Clean up the staging directory.
*/
private def cleanupStagingDir() {
var stagingDirPath: Path = null
......@@ -356,13 +375,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
fs.delete(stagingDirPath, true)
}
} catch {
case e: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, e)
case ioe: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}
// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
// The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
def run() {
......@@ -372,15 +390,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}
}
object ApplicationMaster {
// number of times to wait for the allocator loop to complete.
// each loop iteration waits for 100ms, so maximum of 3 seconds.
// Number of times to wait for the allocator loop to complete.
// Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more
// containers are available. Might need to handle this better.
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
......@@ -398,7 +415,8 @@ object ApplicationMaster {
applicationMasters.add(master)
}
val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null)
val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null /* initialValue */)
val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
def sparkContextInitialized(sc: SparkContext): Boolean = {
......@@ -408,19 +426,21 @@ object ApplicationMaster {
sparkContextRef.notifyAll()
}
// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
// Should not really have to do this, but it helps yarn to evict resources earlier.
// not to mention, prevent Client declaring failure even though we exit'ed properly.
// Note that this will unfortunately not properly clean up the staging files because it gets called to
// late and the filesystem is already shutdown.
// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
// System.exit.
// Should not really have to do this, but it helps YARN to evict resources earlier.
// Not to mention, prevent the Client from declaring failure even though we exited properly.
// Note that this will unfortunately not properly clean up the staging files because it gets
// called too late, after the filesystem is already shutdown.
if (modified) {
Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
// Best case ...
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
......@@ -428,7 +448,7 @@ object ApplicationMaster {
} )
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
......
......@@ -20,43 +20,43 @@ package org.apache.spark.deploy.yarn
import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
def this(args: ClientArguments) = this(new Configuration(), args)
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
private val SPARK_STAGING: String = ".sparkStaging"
private val distCacheMgr = new ClientDistributedCacheManager()
// staging directory is private! -> rwx--------
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
// app files are world-wide readable and owner writable -> rw-r--r--
// App files are world-wide readable and owner writable -> rw-r--r--
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() {
......@@ -79,7 +79,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
submitApp(appContext)
monitorApplication(appId)
System.exit(0)
}
......@@ -90,20 +90,25 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
queueInfo.getMaximumCapacity,
queueInfo.getApplications.size,
queueInfo.getChildQueues.size)
}
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
// if we have requested more then the clusters max for a single resource then exit.
// If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
logError("the worker size is to large to run on this cluster " + args.workerMemory);
System.exit(1)
......@@ -114,10 +119,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(1)
}
// We could add checks to make sure the entire cluster has enough resources but that involves getting
// all the node reports and computing ourselves
// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves
}
def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
logInfo("Setting up application submission context for ASM")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
......@@ -126,9 +131,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
return appContext
}
/*
* see if two file systems are the same or not.
*/
/** See if two file systems are the same or not. */
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
val srcUri = srcFs.getUri()
val dstUri = destFs.getUri()
......@@ -163,9 +166,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
return true;
}
/**
* Copy the file into HDFS if needed.
*/
/** Copy the file into HDFS if needed. */
private def copyRemoteFile(
dstDir: Path,
originalPath: Path,
......@@ -181,9 +182,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
fs.setReplication(newPath, replication);
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
// resolve any symlinks in the URI path so using a "current" symlink
// to point to a specific version shows the specific version
// in the distributed cache configuration
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
val fc = FileContext.getFileContext(qualPath.toUri(), conf)
val destPath = fc.resolvePath(qualPath)
......@@ -192,8 +192,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
// Upload Spark and the application JAR to the remote file system if necessary
// Add them as local resources to the AM
// Upload Spark and the application JAR to the remote file system if necessary. Add them as
// local resources to the AM.
val fs = FileSystem.get(conf)
val delegTokenRenewer = Master.getMasterPrincipal(conf);
......@@ -276,7 +276,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
UserGroupInformation.getCurrentUser().addCredentials(credentials);
return localResources
}
def setupLaunchEnv(
localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
......@@ -289,16 +289,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
// set the environment variables to be passed on to the Workers
// Set the environment variables to be passed on to the Workers.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
// allow users to specify some environment variables
// Allow users to specify some environment variables.
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
// Add each SPARK-* key to the environment
// Add each SPARK-* key to the environment.
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
env
}
def userArgsToString(clientArgs: ClientArguments): String = {
......@@ -308,13 +308,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
for (arg <- args){
retval.append(prefix).append(" '").append(arg).append("' ")
}
retval.toString
}
def createContainerLaunchContext(newApp: GetNewApplicationResponse,
localResources: HashMap[String, LocalResource],
env: HashMap[String, String]): ContainerLaunchContext = {
def createContainerLaunchContext(
newApp: GetNewApplicationResponse,
localResources: HashMap[String, LocalResource],
env: HashMap[String, String]): ContainerLaunchContext = {
logInfo("Setting up container launch context")
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
......@@ -322,8 +322,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
// TODO(harvey): This can probably be a val.
var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
(if (0 != (args.amMemory % minResMemory)) minResMemory else 0) - YarnAllocationHandler.MEMORY_OVERHEAD
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
YarnAllocationHandler.MEMORY_OVERHEAD)
// Extra options for the JVM
var JAVA_OPTS = ""
......@@ -334,14 +336,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
// Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
// limited to subset of cores on a node.
if (env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))) {
// In our expts, using (default) throughput collector has severe perf ramnifications in multi-tenant machines
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out. The context is, default gc for server class machines
// end up using all cores to do gc - hence if there are multiple containers in same node,
// spark gc effects all other containers performance (which can also be other spark containers)
// Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
// multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
// of cores on a node.
val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramnifications in
// multi-tenant machines
JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
JAVA_OPTS += " -XX:+CMSIncrementalMode "
JAVA_OPTS += " -XX:+CMSIncrementalPacing "
......@@ -353,7 +359,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
// Command for the ApplicationMaster
// Command for the ApplicationMaster.
var javaCommand = "java";
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
......@@ -379,28 +385,28 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
logInfo("Command for the ApplicationMaster: " + commands(0))
amContainer.setCommands(commands)
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster
// Memory for the ApplicationMaster.
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
amContainer.setResource(capability)
// Setup security tokens
// Setup security tokens.
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
return amContainer
amContainer
}
def submitApp(appContext: ApplicationSubmissionContext) = {
// Submit the application to the applications manager
// Submit the application to the applications manager.
logInfo("Submitting application to ASM")
super.submitApplication(appContext)
}
def monitorApplication(appId: ApplicationId): Boolean = {
while(true) {
while (true) {
Thread.sleep(1000)
val report = super.getApplicationReport(appId)
......@@ -418,16 +424,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
"\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
"\t appUser: " + report.getUser()
)
val state = report.getYarnApplicationState()
val dsStatus = report.getFinalApplicationStatus()
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return true
return true
}
}
return true
true
}
}
......@@ -459,7 +465,7 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
// normally the users app.jar is last in case conflicts with spark jars
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
......
......@@ -21,53 +21,60 @@ import java.net.URI
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.spark.Logging
import org.apache.spark.util.Utils
class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
extends Runnable with Logging {
class WorkerRunnable(
container: Container,
conf: Configuration,
masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int)
extends Runnable with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = null
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
logInfo("Starting Worker Container")
cm = connectToCM
startContainer
}
def startContainer = {
logInfo("Setting up ContainerLaunchContext")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
ctx.setContainerId(container.getId())
ctx.setResource(container.getResource())
val localResources = prepareLocalResources
ctx.setLocalResources(localResources)
val env = prepareEnvironment
ctx.setEnvironment(env)
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
......@@ -80,17 +87,21 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
// Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
// limited to subset of cores on a node.
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence
// if there are multiple containers in same node, spark gc effects all other containers
// performance (which can also be other spark containers)
// Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
// multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
// of cores on a node.
/*
else {
// If no java_opts specified, default to using -XX:+CMSIncrementalMode
// It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont want to mess with it.
// In our expts, using (default) throughput collector has severe perf ramnifications in multi-tennent machines
// It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
// want to mess with it.
// In our expts, using (default) throughput collector has severe perf ramnifications in
// multi-tennent machines
// The options are based on
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
......@@ -117,8 +128,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
" org.apache.spark.executor.CoarseGrainedExecutorBackend " +
......@@ -130,7 +143,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
logInfo("Setting up worker with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
......@@ -138,7 +151,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
cm.startContainer(startReq)
}
private def setupDistributedCache(file: String,
private def setupDistributedCache(
file: String,
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
......@@ -153,12 +167,11 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
amJarRsrc.setSize(size.toLong)
localResources(uri.getFragment()) = amJarRsrc
}
def prepareLocalResources: HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
val localResources = HashMap[String, LocalResource]()
if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
......@@ -180,30 +193,30 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
timeStamps(i), fileSizes(i), visibilities(i))
}
}
logInfo("Prepared Local resources " + localResources)
return localResources
}
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// allow users to specify some environment variables
// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
}
def connectToCM: ContainerManager = {
val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
logInfo("Connecting to ContainerManager at " + cmHostPortStr)
// use doAs and remoteUser here so we can add the container token and not
// pollute the current users credentials with all of the individual container tokens
// Use doAs and remoteUser here so we can add the container token and not pollute the current
// users credentials with all of the individual container tokens
val user = UserGroupInformation.createRemoteUser(container.getId().toString());
val containerToken = container.getContainerToken();
if (containerToken != null) {
......@@ -219,5 +232,5 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
});
return proxy;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment