diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e132955f0f850775423782af7c05afbed0c18812..a80b3cce6034dbb3ff99aad6340b30005e7b38bd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging { ui.bind() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ - val hadoopConfiguration: Configuration = { - val hadoopConf = SparkHadoopUtil.get.newConfiguration() - // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && - System.getenv("AWS_SECRET_ACCESS_KEY") != null) { - hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } - } - val bufferSize = conf.get("spark.buffer.size", "65536") - hadoopConf.set("io.file.buffer.size", bufferSize) - hadoopConf - } + val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { @@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging { addedFiles(key) = System.currentTimeMillis // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager) + Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, + hadoopConfiguration) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 148115d3ed351006ea2675678f5580324ceb1194..fe0ad9ebbca12b3baa38ee56b95b2aeaacba4bdb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,15 +24,18 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{Logging, SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException} +import org.apache.spark.annotation.DeveloperApi import scala.collection.JavaConversions._ /** + * :: DeveloperApi :: * Contains util methods to interact with Hadoop from Spark. */ +@DeveloperApi class SparkHadoopUtil extends Logging { - val conf: Configuration = newConfiguration() + val conf: Configuration = newConfiguration(new SparkConf()) UserGroupInformation.setConfiguration(conf) /** @@ -64,11 +67,39 @@ class SparkHadoopUtil extends Logging { } } + @Deprecated + def newConfiguration(): Configuration = newConfiguration(null) + /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * subsystems. */ - def newConfiguration(): Configuration = new Configuration() + def newConfiguration(conf: SparkConf): Configuration = { + val hadoopConf = new Configuration() + + // Note: this null check is around more than just access to the "conf" object to maintain + // the behavior of the old implementation of this code, for backwards compatibility. + if (conf != null) { + // Explicitly check for S3 environment variables + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + } + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + conf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } + val bufferSize = conf.get("spark.buffer.size", "65536") + hadoopConf.set("io.file.buffer.size", bufferSize) + } + + hadoopConf + } /** * Add any user credentials to the job conf which are necessary for running on a secure Hadoop @@ -86,7 +117,7 @@ class SparkHadoopUtil extends Logging { def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } - def loginUserFromKeytab(principalName: String, keytabFilename: String) { + def loginUserFromKeytab(principalName: String, keytabFilename: String) { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index cc06540ee0647fa31275eb34a960caa61f0a8d9e..05c8a90782c749c9c57136af931a70e93148474e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils @@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis .map { d => Utils.resolveURI(d) } .getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") } - private val fs = Utils.getHadoopFileSystem(resolvedLogDir) + private val fs = Utils.getHadoopFileSystem(resolvedLogDir, + SparkHadoopUtil.get.newConfiguration(conf)) // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTimeMs = -1L diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5017273e87c07e5d0b049b855fdb2f41b7745921..2a66fcfe4801c634fd8f42e904d25f9852632e6b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, + SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState @@ -673,7 +674,8 @@ private[spark] class Master( app.desc.appUiUrl = notFoundBasePath return false } - val fileSystem = Utils.getHadoopFileSystem(eventLogDir) + val fileSystem = Utils.getHadoopFileSystem(eventLogDir, + SparkHadoopUtil.get.newConfiguration(conf)) val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem) val eventLogPaths = eventLogInfo.logPaths val compressionCodec = eventLogInfo.compressionCodec diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 5caaf6bea3575e1b8a4d9824630a7c77dba60842..9f9911762505aa3431a62a7d50cd97b000b914af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -28,8 +28,8 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileUtil, Path} -import org.apache.spark.Logging -import org.apache.spark.deploy.{Command, DriverDescription} +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState @@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState * This is currently only used in standalone cluster deploy mode. */ private[spark] class DriverRunner( + val conf: SparkConf, val driverId: String, val workDir: File, val sparkHome: File, @@ -144,8 +145,8 @@ private[spark] class DriverRunner( val jarPath = new Path(driverDesc.jarUrl) - val emptyConf = new Configuration() - val jarFileSystem = jarPath.getFileSystem(emptyConf) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val jarFileSystem = jarPath.getFileSystem(hadoopConf) val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) val jarFileName = jarPath.getName @@ -154,7 +155,7 @@ private[spark] class DriverRunner( if (!localJarFile.exists()) { // May already exist if running multiple workers on one node logInfo(s"Copying user jar $jarPath to $destPath") - FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf) + FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf) } if (!localJarFile.exists()) { // Verify copy succeeded diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 81400af22c0bfde968924dddcc5de4441d449df4..e475567db6a20eb09036edf26233153a57254e66 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -257,7 +257,7 @@ private[spark] class Worker( val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { executors.get(fullId) match { - case Some(executor) => + case Some(executor) => logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) @@ -288,7 +288,7 @@ private[spark] class Worker( case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) + val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2f76e532aeb761ad886aa25e8fee91607a6fdf79..d7d19f6fa3b960ccf1692d0767e5e999b47d6d2a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} @@ -294,9 +295,9 @@ private[spark] class Executor( try { val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader") .asInstanceOf[Class[_ <: ClassLoader]] - val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader], - classOf[Boolean]) - constructor.newInstance(classUri, parent, userClassPathFirst) + val constructor = klass.getConstructor(classOf[SparkConf], classOf[String], + classOf[ClassLoader], classOf[Boolean]) + constructor.newInstance(conf, classUri, parent, userClassPathFirst) } catch { case _: ClassNotFoundException => logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") @@ -313,16 +314,19 @@ private[spark] class Executor( * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager) + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, + hadoopConf) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager) + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, + hadoopConf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 20938781ac6947aa72ca111434504d417b0bc152..7ba1182f0ed271e07fe7b2c6108a2016597e7989 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -157,7 +157,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val conf = SparkHadoopUtil.get.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) val fs = path.getFileSystem(conf) val broadcastedConf = sc.broadcast(new SerializableWritable(conf)) sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 370fcd85aa680baa42114c8bd726055ba557f4c8..4b99f630440ad0a73976d0bb568a39233f00332e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -44,11 +44,14 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils} private[spark] class EventLoggingListener( appName: String, sparkConf: SparkConf, - hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration()) + hadoopConf: Configuration) extends SparkListener with Logging { import EventLoggingListener._ + def this(appName: String, sparkConf: SparkConf) = + this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) + private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 4f7133c4bc17c23c682166e95ee7d8e578739a9b..bc7670f4a804d99e40ad07bc32ba778e3d8aa379 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.{Logging, SparkContext, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class SimrSchedulerBackend( @@ -44,7 +45,7 @@ private[spark] class SimrSchedulerBackend( sc.conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val conf = new Configuration() + val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) logInfo("Writing to HDFS file: " + driverFilePath) @@ -63,7 +64,7 @@ private[spark] class SimrSchedulerBackend( } override def stop() { - val conf = new Configuration() + val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) fs.delete(new Path(driverFilePath), false) super.stop() diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index ad8b79af877d848702a8872d2b4a41ed940512c7..6d1fc05a15d2ca0cddb930de25b8bd7e7de6a44b 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -41,13 +41,22 @@ import org.apache.spark.io.CompressionCodec private[spark] class FileLogger( logDir: String, sparkConf: SparkConf, - hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(), + hadoopConf: Configuration, outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true, dirPermissions: Option[FsPermission] = None) extends Logging { + def this( + logDir: String, + sparkConf: SparkConf, + compress: Boolean = false, + overwrite: Boolean = true) = { + this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress, + overwrite = overwrite) + } + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } @@ -57,7 +66,7 @@ private[spark] class FileLogger( * create unique FileSystem instance only for FileLogger */ private val fileSystem = { - val conf = SparkHadoopUtil.get.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration(sparkConf) val logUri = new URI(logDir) val scheme = logUri.getScheme if (scheme == "hdfs") { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 86f646d2af18118fc9b470362057c54f465f62f8..0ae28f911e302e10fb9cdf376b2b6143fe2154de 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,6 +34,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ import tachyon.client.{TachyonFile,TachyonFS} @@ -318,7 +319,8 @@ private[spark] object Utils extends Logging { * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) { + def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, + hadoopConf: Configuration) { val filename = url.split("/").last val tempDir = getLocalDir(conf) val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) @@ -390,7 +392,7 @@ private[spark] object Utils extends Logging { } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others - val fs = getHadoopFileSystem(uri) + val fs = getHadoopFileSystem(uri, hadoopConf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) @@ -862,8 +864,8 @@ private[spark] object Utils extends Logging { */ def getCallSite: CallSite = { val trace = Thread.currentThread.getStackTrace() - .filterNot { ste:StackTraceElement => - // When running under some profilers, the current stack trace might contain some bogus + .filterNot { ste:StackTraceElement => + // When running under some profilers, the current stack trace might contain some bogus // frames. This is intended to ensure that we don't crash in these situations by // ignoring any frames that we can't examine. (ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace")) @@ -1179,15 +1181,15 @@ private[spark] object Utils extends Logging { /** * Return a Hadoop FileSystem with the scheme encoded in the given path. */ - def getHadoopFileSystem(path: URI): FileSystem = { - FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) + def getHadoopFileSystem(path: URI, conf: Configuration): FileSystem = { + FileSystem.get(path, conf) } /** * Return a Hadoop FileSystem with the scheme encoded in the given path. */ - def getHadoopFileSystem(path: String): FileSystem = { - getHadoopFileSystem(new URI(path)) + def getHadoopFileSystem(path: String, conf: Configuration): FileSystem = { + getHadoopFileSystem(new URI(path), conf) } /** @@ -1264,7 +1266,7 @@ private[spark] object Utils extends Logging { } } - /** + /** * Execute the given block, logging and re-throwing any uncaught exception. * This is particularly useful for wrapping code that runs in a thread, to ensure * that exceptions are printed, and to avoid having to catch Throwable. diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 31aa7ec837f430e783f0df87fc5db404cb14fcff..2a58c6a40d8e497e3922c2db0b9845dd5d9816d3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -121,8 +121,8 @@ class JsonProtocolSuite extends FunSuite { new SparkConf, ExecutorState.RUNNING) } def createDriverRunner(): DriverRunner = { - new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(), - null, "akka://worker") + new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"), + createDriverDesc(), null, "akka://worker") } def assertValidJson(json: JValue) { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index c930839b47f1107cc1016194c21d44eeb73607ba..b6f4411e0587a47a966c107eb4b051f091a5501c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -25,14 +25,15 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.FunSuite +import org.apache.spark.SparkConf import org.apache.spark.deploy.{Command, DriverDescription} class DriverRunnerTest extends FunSuite { private def createDriverRunner() = { val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) - new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription, - null, "akka://1.2.3.4/worker/") + new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"), + driverDescription, null, "akka://1.2.3.4/worker/") } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 10d8b299317eaa1c44ed47ba03ff1a762f3fedb2..41e58a008c533039ce65a0f52b6723ea5e3eabd0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -39,7 +40,8 @@ import java.io.File * read and deserialized into actual SparkListenerEvents. */ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { - private val fileSystem = Utils.getHadoopFileSystem("/") + private val fileSystem = Utils.getHadoopFileSystem("/", + SparkHadoopUtil.get.newConfiguration(new SparkConf())) private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 6b6e0104e5467f696146d278fde18a980e907837..8f0ee9f4dbafd7bb8063d2f25ed6e2ea1db4a491 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SparkContext._ import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -32,7 +33,8 @@ import org.apache.spark.util.{JsonProtocol, Utils} * Test whether ReplayListenerBus replays events from logs correctly. */ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { - private val fileSystem = Utils.getHadoopFileSystem("/") + private val fileSystem = Utils.getHadoopFileSystem("/", + SparkHadoopUtil.get.newConfiguration(new SparkConf())) private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS private var testDir: File = _ diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index 44332fc8dbc234318f76d71b19496e2c51a61feb..c3dd156b405142852b398f36d4d07fbedf9433a9 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -26,13 +26,15 @@ import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec /** * Test writing files through the FileLogger. */ class FileLoggerSuite extends FunSuite with BeforeAndAfter { - private val fileSystem = Utils.getHadoopFileSystem("/") + private val fileSystem = Utils.getHadoopFileSystem("/", + SparkHadoopUtil.get.newConfiguration(new SparkConf())) private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index d583cf421ed2392c5c3455d2f7118ed41324ec45..3258510894372b3c45ec9b45f83b980405dd6ba4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -22,9 +22,9 @@ import java.util.Random import scala.math.exp import breeze.linalg.{Vector, DenseVector} +import org.apache.hadoop.conf.Configuration import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo @@ -70,7 +70,7 @@ object SparkHdfsLR { val sparkConf = new SparkConf().setAppName("SparkHdfsLR") val inputPath = args(0) - val conf = SparkHadoopUtil.get.newConfiguration() + val conf = new Configuration() val sc = new SparkContext(sparkConf, InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index 22127621867e153670fcc34d5f4a7d6305d4561f..96d13612e46dd084bfa3592180b163f94f1d80de 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -22,9 +22,9 @@ import java.util.Random import scala.math.exp import breeze.linalg.{Vector, DenseVector} +import org.apache.hadoop.conf.Configuration import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo import org.apache.spark.storage.StorageLevel @@ -52,8 +52,8 @@ object SparkTachyonHdfsLR { def main(args: Array[String]) { val inputPath = args(0) - val conf = SparkHadoopUtil.get.newConfiguration() val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR") + val conf = new Configuration() val sc = new SparkContext(sparkConf, InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 687e85ca94d3ca30e3036df92d24b1175ca29b45..5ee325008a5cd9fc82f83d2fab2f73581f2e28de 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -21,10 +21,10 @@ import java.io.{ByteArrayOutputStream, InputStream} import java.net.{URI, URL, URLEncoder} import java.util.concurrent.{Executors, ExecutorService} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils import org.apache.spark.util.ParentClassLoader @@ -36,7 +36,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ * used to load classes defined by the interpreter when the REPL is used. * Allows the user to specify if user class path should be first */ -class ExecutorClassLoader(classUri: String, parent: ClassLoader, +class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader, userClassPathFirst: Boolean) extends ClassLoader { val uri = new URI(classUri) val directory = uri.getPath @@ -48,7 +48,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader, if (uri.getScheme() == "http") { null } else { - FileSystem.get(uri, new Configuration()) + FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf)) } } diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala index c0af7ceb6d3ef48cb068f805b435ccb7ffdde9a2..3e2ee7541f40d6bae1c58b7cb85e90e1b4b9475f 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.FunSuite import com.google.common.io.Files -import org.apache.spark.TestUtils +import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.util.Utils class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll { @@ -57,7 +57,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll { test("child first") { val parentLoader = new URLClassLoader(urls2, null) - val classLoader = new ExecutorClassLoader(url1, parentLoader, true) + val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true) val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance() val fakeClassVersion = fakeClass.toString assert(fakeClassVersion === "1") @@ -65,7 +65,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll { test("parent first") { val parentLoader = new URLClassLoader(urls2, null) - val classLoader = new ExecutorClassLoader(url1, parentLoader, false) + val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, false) val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance() val fakeClassVersion = fakeClass.toString assert(fakeClassVersion === "2") @@ -73,7 +73,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll { test("child first can fall back") { val parentLoader = new URLClassLoader(urls2, null) - val classLoader = new ExecutorClassLoader(url1, parentLoader, true) + val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true) val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance() val fakeClassVersion = fakeClass.toString assert(fakeClassVersion === "2") @@ -81,7 +81,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll { test("child first can fail") { val parentLoader = new URLClassLoader(urls2, null) - val classLoader = new ExecutorClassLoader(url1, parentLoader, true) + val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true) intercept[java.lang.ClassNotFoundException] { classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance() } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9be78546c1091230c61ab2ecd071c6726045fb4f..12f1cd3813a05c5e704942534f663c2f33ae4757 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa extends YarnClientImpl with ClientBase with Logging { def this(clientArgs: ClientArguments, spConf: SparkConf) = - this(clientArgs, new Configuration(), spConf) + this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 5f66a98e7528b773605b9087fb1ce790c766601f..8c548409719daeafda0ea854fad009e5b661a304 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -48,7 +48,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // optimal as more containers are available. Might need to handle this better. private val sparkConf = new SparkConf() - private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration()) + private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) + .asInstanceOf[YarnConfiguration] private val isDriver = args.userClass != null // Default to numExecutors * 2, with minimum of 3 diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 2aa27a19085828e14389c2d8ed382606104c71b4..ffe2731ca1d17812c097065a4513cf9164306c35 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -54,7 +54,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. - override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + override def newConfiguration(conf: SparkConf): Configuration = + new YarnConfiguration(super.newConfiguration(conf)) // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster override def addCredentials(conf: JobConf) { diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index d162b4c433f46a8529d021965fcd5f1cf576ad3f..254774a6b839e6cc3b66309a10f49527b3cafde1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ -import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils @@ -26,14 +25,11 @@ import org.apache.spark.util.Utils /** * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) - extends TaskSchedulerImpl(sc) { - - def this(sc: SparkContext) = this(sc, new Configuration()) +private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - Option(YarnSparkHadoopUtil.lookupRack(conf, host)) + Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 69f40225a21f544b2651f742e68d1f8eb6d7b404..4157ff95c2794780312e5bcb1812539561d2e373 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,19 +21,15 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils -import org.apache.hadoop.conf.Configuration /** * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) - extends TaskSchedulerImpl(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") - def this(sc: SparkContext) = this(sc, new Configuration()) - // Nothing else for now ... initialize application master : which needs a SparkContext to // determine how to allocate. // Note that only the first creation of a SparkContext influences (and ideally, there must be @@ -43,8 +39,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - val retval = YarnSparkHadoopUtil.lookupRack(conf, host) - if (retval != null) Some(retval) else None + Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) } override def postStartHook() { diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 7650bd4396c126e319f1f9ddc8de0297bbee1639..75db8ee6d468f7adddead91c815160682cb564a0 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.{FunSuite, Matchers} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { @@ -61,4 +62,16 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { } } + test("Yarn configuration override") { + val key = "yarn.nodemanager.hostname" + val default = new YarnConfiguration() + + val sparkConf = new SparkConf() + .set("spark.hadoop." + key, "someHostName") + val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf) + + yarnConf.getClass() should be (classOf[YarnConfiguration]) + yarnConf.get(key) should not be default.get(key) + } + } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1f9a4bf209eb981c4ab6616c4fb84418f3117d24..313a0d21ce181a8b472ad14587a36d7fff93f896 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SparkConf} - +import org.apache.spark.deploy.SparkHadoopUtil /** * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API. @@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val yarnClient = YarnClient.createYarnClient def this(clientArgs: ClientArguments, spConf: SparkConf) = - this(clientArgs, new Configuration(), spConf) + this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())