diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 25ca650a3a37e58ac5ded2a389fe7edf978141d2..c14dce8273bc122fa20bc33759807311ba09f87a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val ui = new SparkUI(this) ui.bind() + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ + val hadoopConfiguration: Configuration = { + val env = SparkEnv.get + val hadoopConf = SparkHadoopUtil.get.newConfiguration() + // Explicitly check for S3 environment variables + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + } + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + conf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } + val bufferSize = conf.get("spark.buffer.size", "65536") + hadoopConf.set("io.file.buffer.size", bufferSize) + hadoopConf + } + // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf) + val logger = new EventLoggingListener(appName, conf, hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) @@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging { postEnvironmentUpdate() postApplicationStart() - /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ - val hadoopConfiguration: Configuration = { - val env = SparkEnv.get - val hadoopConf = SparkHadoopUtil.get.newConfiguration() - // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && - System.getenv("AWS_SECRET_ACCESS_KEY") != null) { - hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } - } - val bufferSize = conf.get("spark.buffer.size", "65536") - hadoopConf.set("io.file.buffer.size", bufferSize) - hadoopConf - } - private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index b983c16af14f423184442471115a01860141632f..2fe65cd944b67eda2b7fe374dbb74da66ccc9b57 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import scala.collection.mutable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.json4s.jackson.JsonMethods._ @@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol} * spark.eventLog.dir - Path to the directory in which events are logged. * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams */ -private[spark] class EventLoggingListener(appName: String, conf: SparkConf) +private[spark] class EventLoggingListener( + appName: String, + conf: SparkConf, + hadoopConfiguration: Configuration) extends SparkListener with Logging { import EventLoggingListener._ @@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) val logDir = logBaseDir + "/" + name private val logger = - new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite) + new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, + shouldOverwrite) /** * Begin logging events. diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 7d58d1c7651807838fb5cf12768c658379c15605..7d47b2a72aff74327d2b421eecd0a23e89d4bc39 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -22,7 +22,8 @@ import java.net.URI import java.text.SimpleDateFormat import java.util.Date -import org.apache.hadoop.fs.{FSDataOutputStream, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec @@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec */ private[spark] class FileLogger( logDir: String, - conf: SparkConf = new SparkConf, + conf: SparkConf, + hadoopConfiguration: Configuration, outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true) @@ -85,19 +87,20 @@ private[spark] class FileLogger( private def createWriter(fileName: String): PrintWriter = { val logPath = logDir + "/" + fileName val uri = new URI(logPath) + val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme + val isDefaultLocal = (defaultFs == null || defaultFs == "file") /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ - val dstream = uri.getScheme match { - case "file" | null => + val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { // Second parameter is whether to append new FileOutputStream(uri.getPath, !overwrite) - - case _ => + } else { val path = new Path(logPath) hadoopDataStream = Some(fileSystem.create(path, overwrite)) hadoopDataStream.get - } + } val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream