diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 73990f042379649718a4cd7a338327f4503a9772..5f71df33b681bd91fe8d330f75a267699ee034a6 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -61,9 +61,21 @@ class SparkEnv ( // If executorId is NOT found, return defaultHostPort var executorIdToHostPort: Option[(String, String) => String]) { - val hadoop = new SparkHadoopUtil private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if(yarnMode) { + try { + Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 1b06169739c0ccacd1b77251f7079ba2fde3f2f7..d69a969d42fb94a0a6bab929c67a8cbc0aaf2276 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -130,11 +130,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + driverHost + ":" + driverPort) + logInfo("Driver now available: " + driverHost + ":" + driverPort) driverUp = true } catch { case e: Exception => - logError("Failed to connect to driver at " + driverHost + ":" + driverPort) + logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") Thread.sleep(100) } } diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala index 8bcbfc273517ce8ef54d1f4aec37cf01787fb591..9d3860b863b8fe9024bed65bde6cb8ef88c5c87a 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) - SparkHadoopUtil.setYarnMode(env) + env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + localResources("spark.jar").getResource().getFile().toString() @@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl object Client { def main(argStrings: Array[String]) { + // Set an env variable indicating we are running in YARN mode. + // Note that anything with SPARK prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val args = new ClientArguments(argStrings) - SparkHadoopUtil.setYarnMode() new Client(args).run } diff --git a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala similarity index 71% rename from yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala rename to yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a812bcf867261df315fd4907f3f920c07b179203..77c4ee7f3f67f9afec64bc3aa978c09ce43e35be 100644 --- a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -15,8 +15,9 @@ * limitations under the License. */ -package spark.deploy +package spark.deploy.yarn +import spark.deploy.SparkHadoopUtil import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation @@ -28,32 +29,17 @@ import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. */ -object SparkHadoopUtil { - - val yarnConf = newConfiguration() +class YarnSparkHadoopUtil extends SparkHadoopUtil { // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - def isYarnMode(): Boolean = { - val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) - java.lang.Boolean.valueOf(yarnMode) - } - - // Set an env variable indicating we are running in YARN mode. - // Note that anything with SPARK prefix gets propagated to all (remote) processes - def setYarnMode() { - System.setProperty("SPARK_YARN_MODE", "true") - } - - def setYarnMode(env: HashMap[String, String]) { - env("SPARK_YARN_MODE") = "true" - } + override def isYarnMode(): Boolean = { true } // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. - def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) { + override def addCredentials(conf: JobConf) { val jobCreds = conf.getCredentials(); jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) }