From dd1b7a61d9193c93ab95ab550622259f4bc26f53 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Wed, 23 Apr 2014 14:47:38 -0700
Subject: [PATCH] Honor default fs name when initializing event logger.

This is related to SPARK-1459 / PR #375. Without this fix,
FileLogger.createLogDir() may try to create the log dir on
HDFS, while createWriter() will try to open the log file on
the local file system, leading to interesting errors and
confusion.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #450 from vanzin/event-file-2 and squashes the following commits:

592cdb3 [Marcelo Vanzin] Honor default fs name when initializing event logger.
---
 .../scala/org/apache/spark/SparkContext.scala | 48 +++++++++----------
 .../scheduler/EventLoggingListener.scala      |  9 +++-
 .../org/apache/spark/util/FileLogger.scala    | 17 ++++---
 3 files changed, 41 insertions(+), 33 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 25ca650a3a..c14dce8273 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] val ui = new SparkUI(this)
   ui.bind()
 
+  /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
+  val hadoopConfiguration: Configuration = {
+    val env = SparkEnv.get
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration()
+    // Explicitly check for S3 environment variables
+    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+      hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+      hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+      hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+      hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+    }
+    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
+    conf.getAll.foreach { case (key, value) =>
+      if (key.startsWith("spark.hadoop.")) {
+        hadoopConf.set(key.substring("spark.hadoop.".length), value)
+      }
+    }
+    val bufferSize = conf.get("spark.buffer.size", "65536")
+    hadoopConf.set("io.file.buffer.size", bufferSize)
+    hadoopConf
+  }
+
   // Optionally log Spark events
   private[spark] val eventLogger: Option[EventLoggingListener] = {
     if (conf.getBoolean("spark.eventLog.enabled", false)) {
-      val logger = new EventLoggingListener(appName, conf)
+      val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
       logger.start()
       listenerBus.addListener(logger)
       Some(logger)
@@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
   postEnvironmentUpdate()
   postApplicationStart()
 
-  /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
-  val hadoopConfiguration: Configuration = {
-    val env = SparkEnv.get
-    val hadoopConf = SparkHadoopUtil.get.newConfiguration()
-    // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
-        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
-      hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-      hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-    }
-    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-    conf.getAll.foreach { case (key, value) =>
-      if (key.startsWith("spark.hadoop.")) {
-        hadoopConf.set(key.substring("spark.hadoop.".length), value)
-      }
-    }
-    val bufferSize = conf.get("spark.buffer.size", "65536")
-    hadoopConf.set("io.file.buffer.size", bufferSize)
-    hadoopConf
-  }
-
   private[spark] var checkpointDir: Option[String] = None
 
   // Thread Local variable that can be used by users to pass information down the stack
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index b983c16af1..2fe65cd944 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.json4s.jackson.JsonMethods._
 
@@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
  *   spark.eventLog.dir - Path to the directory in which events are logged.
  *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
  */
-private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
+private[spark] class EventLoggingListener(
+    appName: String,
+    conf: SparkConf,
+    hadoopConfiguration: Configuration)
   extends SparkListener with Logging {
 
   import EventLoggingListener._
@@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
   val logDir = logBaseDir + "/" + name
 
   private val logger =
-    new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
+    new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
+      shouldOverwrite)
 
   /**
    * Begin logging events.
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 7d58d1c765..7d47b2a72a 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -22,7 +22,8 @@ import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import org.apache.hadoop.fs.{FSDataOutputStream, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.io.CompressionCodec
@@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec
  */
 private[spark] class FileLogger(
     logDir: String,
-    conf: SparkConf = new SparkConf,
+    conf: SparkConf,
+    hadoopConfiguration: Configuration,
     outputBufferSize: Int = 8 * 1024, // 8 KB
     compress: Boolean = false,
     overwrite: Boolean = true)
@@ -85,19 +87,20 @@ private[spark] class FileLogger(
   private def createWriter(fileName: String): PrintWriter = {
     val logPath = logDir + "/" + fileName
     val uri = new URI(logPath)
+    val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
+    val isDefaultLocal = (defaultFs == null || defaultFs == "file")
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
      * Therefore, for local files, use FileOutputStream instead. */
-    val dstream = uri.getScheme match {
-      case "file" | null =>
+    val dstream =
+      if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
         // Second parameter is whether to append
         new FileOutputStream(uri.getPath, !overwrite)
-
-      case _ =>
+      } else {
         val path = new Path(logPath)
         hadoopDataStream = Some(fileSystem.create(path, overwrite))
         hadoopDataStream.get
-    }
+      }
 
     val bstream = new BufferedOutputStream(dstream, outputBufferSize)
     val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
-- 
GitLab