From e2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e Mon Sep 17 00:00:00 2001 From: Jey Kottalam <jey@cs.berkeley.edu> Date: Wed, 24 Jul 2013 14:01:48 -0700 Subject: [PATCH] re-enable YARN support --- core/src/main/scala/spark/SparkEnv.scala | 14 +++++++++- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +-- .../main/scala/spark/deploy/yarn/Client.scala | 7 +++-- .../YarnSparkHadoopUtil.scala} | 26 +++++-------------- 4 files changed, 26 insertions(+), 25 deletions(-) rename yarn/src/main/scala/spark/deploy/{SparkHadoopUtil.scala => yarn/YarnSparkHadoopUtil.scala} (71%) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 73990f0423..5f71df33b6 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -61,9 +61,21 @@ class SparkEnv ( // If executorId is NOT found, return defaultHostPort var executorIdToHostPort: Option[(String, String) => String]) { - val hadoop = new SparkHadoopUtil private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if(yarnMode) { + try { + Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 1b06169739..d69a969d42 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -130,11 +130,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + driverHost + ":" + driverPort) + logInfo("Driver now available: " + driverHost + ":" + driverPort) driverUp = true } catch { case e: Exception => - logError("Failed to connect to driver at " + driverHost + ":" + driverPort) + logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") Thread.sleep(100) } } diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala index 8bcbfc2735..9d3860b863 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) - SparkHadoopUtil.setYarnMode(env) + env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + localResources("spark.jar").getResource().getFile().toString() @@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl object Client { def main(argStrings: Array[String]) { + // Set an env variable indicating we are running in YARN mode. + // Note that anything with SPARK prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val args = new ClientArguments(argStrings) - SparkHadoopUtil.setYarnMode() new Client(args).run } diff --git a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala similarity index 71% rename from yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala rename to yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a812bcf867..77c4ee7f3f 100644 --- a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -15,8 +15,9 @@ * limitations under the License. */ -package spark.deploy +package spark.deploy.yarn +import spark.deploy.SparkHadoopUtil import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation @@ -28,32 +29,17 @@ import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. */ -object SparkHadoopUtil { - - val yarnConf = newConfiguration() +class YarnSparkHadoopUtil extends SparkHadoopUtil { // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - def isYarnMode(): Boolean = { - val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) - java.lang.Boolean.valueOf(yarnMode) - } - - // Set an env variable indicating we are running in YARN mode. - // Note that anything with SPARK prefix gets propagated to all (remote) processes - def setYarnMode() { - System.setProperty("SPARK_YARN_MODE", "true") - } - - def setYarnMode(env: HashMap[String, String]) { - env("SPARK_YARN_MODE") = "true" - } + override def isYarnMode(): Boolean = { true } // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. - def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) { + override def addCredentials(conf: JobConf) { val jobCreds = conf.getCredentials(); jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } -- GitLab