Skip to content
Snippets Groups Projects
Commit b6cf1348 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Matei Zaharia
Browse files

[SPARK-2889] Create Hadoop config objects consistently.

Different places in the code were instantiating Configuration / YarnConfiguration objects in different ways. This could lead to confusion for people who actually expected "spark.hadoop.*" options to end up in the configs used by Spark code, since that would only happen for the SparkContext's config.

This change modifies most places to use SparkHadoopUtil to initialize configs, and make that method do the translation that previously was only done inside SparkContext.

The places that were not changed fall in one of the following categories:
- Test code where this doesn't really matter
- Places deep in the code where plumbing SparkConf would be too difficult for very little gain
- Default values for arguments - since the caller can provide their own config in that case

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #1843 from vanzin/SPARK-2889 and squashes the following commits:

52daf35 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
f179013 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
51e71cf [Marcelo Vanzin] Add test to ensure that overriding Yarn configs works.
53f9506 [Marcelo Vanzin] Add DeveloperApi annotation.
3d345cb [Marcelo Vanzin] Restore old method for backwards compat.
fc45067 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
0ac3fdf [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
3f26760 [Marcelo Vanzin] Compilation fix.
f16cadd [Marcelo Vanzin] Initialize config in SparkHadoopUtil.
b8ab173 [Marcelo Vanzin] Update Utils API to take a Configuration argument.
1e7003f [Marcelo Vanzin] Replace explicit Configuration instantiation with SparkHadoopUtil.
parent d90434c0
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ import org.scalatest.FunSuite
import com.google.common.io.Files
import org.apache.spark.TestUtils
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.util.Utils
class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
......@@ -57,7 +57,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
......@@ -65,7 +65,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, false)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, false)
val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
......@@ -73,7 +73,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
......@@ -81,7 +81,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
}
......
......@@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
extends YarnClientImpl with ClientBase with Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, new Configuration(), spConf)
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
......
......@@ -48,7 +48,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// optimal as more containers are available. Might need to handle this better.
private val sparkConf = new SparkConf()
private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
.asInstanceOf[YarnConfiguration]
private val isDriver = args.userClass != null
// Default to numExecutors * 2, with minimum of 3
......
......@@ -54,7 +54,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
override def newConfiguration(conf: SparkConf): Configuration =
new YarnConfiguration(super.newConfiguration(conf))
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
......
......@@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
......@@ -26,14 +25,11 @@ import org.apache.spark.util.Utils
/**
* This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration)
extends TaskSchedulerImpl(sc) {
def this(sc: SparkContext) = this(sc, new Configuration())
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(conf, host))
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
}
......@@ -21,19 +21,15 @@ import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
/**
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
* ApplicationMaster, etc is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
extends TaskSchedulerImpl(sc) {
private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs a SparkContext to
// determine how to allocate.
// Note that only the first creation of a SparkContext influences (and ideally, there must be
......@@ -43,8 +39,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
if (retval != null) Some(retval) else None
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
}
override def postStartHook() {
......
......@@ -20,9 +20,10 @@ package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.{FunSuite, Matchers}
import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
......@@ -61,4 +62,16 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
}
}
test("Yarn configuration override") {
val key = "yarn.nodemanager.hostname"
val default = new YarnConfiguration()
val sparkConf = new SparkConf()
.set("spark.hadoop." + key, "someHostName")
val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
yarnConf.getClass() should be (classOf[YarnConfiguration])
yarnConf.get(key) should not be default.get(key)
}
}
......@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
......@@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val yarnClient = YarnClient.createYarnClient
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, new Configuration(), spConf)
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment