diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/default-log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..d72dbadc3904f327effddf99594045067be2f529 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/default-log4j.properties @@ -0,0 +1,8 @@ +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index cdfc9dd54e06a82b01846066f686e70a858963a5..69a738dc4446ac23965a8cbd3c7823f014a1fba2 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging { if (server != null) { throw new ServerStateException("Server is already started") } else { + logInfo("Starting HTTP Server") server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 6a973ea4951c3a389a202f6c5dfd77f355061648..d519fc5a2941904323d0b6c5c2b83a4231b30115 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -17,8 +17,8 @@ package org.apache.spark -import org.slf4j.Logger -import org.slf4j.LoggerFactory +import org.apache.log4j.{LogManager, PropertyConfigurator} +import org.slf4j.{Logger, LoggerFactory} /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows @@ -33,6 +33,7 @@ trait Logging { // Method to get or create the logger for this object protected def log: Logger = { if (log_ == null) { + initializeIfNecessary() var className = this.getClass.getName // Ignore trailing $'s in the class names for Scala objects if (className.endsWith("$")) { @@ -89,7 +90,37 @@ trait Logging { log.isTraceEnabled } - // Method for ensuring that logging is initialized, to avoid having multiple - // threads do it concurrently (as SLF4J initialization is not thread safe). - protected def initLogging() { log } + private def initializeIfNecessary() { + if (!Logging.initialized) { + Logging.initLock.synchronized { + if (!Logging.initialized) { + initializeLogging() + } + } + } + } + + private def initializeLogging() { + // If Log4j doesn't seem initialized, load a default properties file + val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + val defaultLogProps = "org/apache/spark/default-log4j.properties" + val classLoader = this.getClass.getClassLoader + Option(classLoader.getResource(defaultLogProps)) match { + case Some(url) => PropertyConfigurator.configure(url) + case None => System.err.println(s"Spark was unable to load $defaultLogProps") + } + log.info(s"Using Spark's default log4j profile: $defaultLogProps") + } + Logging.initialized = true + + // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads + // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html + log + } +} + +object Logging { + @volatile private var initialized = false + val initLock = new Object() } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fbc7a78bf546f1a64df53ee01960459d2d56f2dd..46874c41a23dd654eac65493f3f245eb01ee4f16 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -137,9 +137,6 @@ class SparkContext( val isLocal = (master == "local" || master.startsWith("local[")) - // Ensure logging is initialized before we spawn any threads - initLogging() - // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.create( conf, diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 240015464870bcc7a250efe330ec999bb9d53273..5b70165c35358b451137cf533b8924201324f472 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -48,8 +48,6 @@ private[spark] class Executor( private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) - initLogging() - // No ip or host:port - just hostname Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname") // must not have port specified. diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index caab748d602ee452703091c9f0663ed46a7328f5..6f9f29969eaec85d315c964092c6c815516e75e8 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -26,7 +26,6 @@ import scala.util.matching.Regex import org.apache.spark.Logging private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging { - initLogging() val DEFAULT_PREFIX = "*" val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index ac29816f19f5ab1c1195f6fd0d46c8392b16dd6f..0e41c73ce75a4bfc7586d3428df97faa787ffdc9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -64,7 +64,6 @@ import org.apache.spark.metrics.source.Source */ private[spark] class MetricsSystem private (val instance: String, conf: SparkConf) extends Logging { - initLogging() val confFile = conf.getOrElse("spark.metrics.conf", null) val metricsConfig = new MetricsConfig(Option(confFile)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 73a1da2de6f669ae5ec9f8947a4b713bfa142fe3..dbbeeb39ebedcdeefcde6aab8994220a128d3e4b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private val akkaTimeout = AkkaUtils.askTimeout(conf) - initLogging() - val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs", "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 0c66addf9def6f78628bfff2cec9b7c9e9a8a3b4..21f003609b14de5f511ebca2006afe72faf53d0b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -30,7 +30,6 @@ import org.apache.spark.util.Utils * TODO: Use event model. */ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging { - initLogging() blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive) @@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null - initLogging() - def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index 6ce9127c7450175d61ed66871678ea7b7fc62dc8..a06f50a0ac89cf6f3ada7fad21d8beb59245ab82 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM def length = blockMessages.length - initLogging() - def set(bufferMessage: BufferMessage) { val startTime = System.currentTimeMillis val newBlockMessages = new ArrayBuffer[BlockMessage]() diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index f25d921d3f87faa004975500e56c8b96416c3a13..70bfb81661381e87003afd2c2f8aefbe59afd537 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -26,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { - initLogging() - var checkpointDir: File = _ val partitioner = new HashPartitioner(2) diff --git a/spark-class b/spark-class index 802e4aa1045e483fc96f061b44640d53f781df9b..1858ea62476d9216aa5cf31563962408983ae279 100755 --- a/spark-class +++ b/spark-class @@ -129,11 +129,16 @@ fi # Compute classpath using external script CLASSPATH=`$FWDIR/bin/compute-classpath.sh` -CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR" + +if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then + CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR" +fi if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` - export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR` + if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then + export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR` + fi fi export CLASSPATH diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index ce2a9d414285598b978c8d39bbc30859b80e594f..00671ba5206f95382c021c220c081cea484656ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { - initLogging() - // ======================================================================= // Methods that should be implemented by subclasses of DStream // ======================================================================= diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index daed7ff7c3f1385489c8f691591f100087a39249..a09b891956efe2348043c85cf6f11c22596e3be4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -24,7 +24,6 @@ import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job final private[streaming] class DStreamGraph extends Serializable with Logging { - initLogging() private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0566704c948f152fac2f93b288cfdef9642abfa7..304986f1879368a341498364d1f54c526cc2a1a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -103,8 +103,6 @@ class StreamingContext private ( */ def this(path: String) = this(null, CheckpointReader.read(new SparkConf(), path), null) - initLogging() - if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 1839ca35783e37fc3327451c1876cc317ba851d3..a230845b9271514646734f46dfef53e95da96dcf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe */ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { - initLogging() - lazy protected val env = SparkEnv.get lazy protected val actor = env.actorSystem.actorOf( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index ab60a8166ea70bfea847dc1afb2896152422a1f9..844180c81a65b0932d404bb552f1b1bccebe664d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -36,8 +36,6 @@ private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { - initLogging() - val ssc = jobScheduler.ssc val graph = ssc.graph val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 7fd8d41c8cbfe05f04954905445fef6223ef9e03..651cdaaa6d0b217c91e914df57fd2243fc4e79c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -30,8 +30,6 @@ import org.apache.spark.streaming._ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { - initLogging() - val jobSets = new ConcurrentHashMap[Time, JobSet] val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt val executor = Executors.newFixedThreadPool(numConcurrentJobs) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 4a3993e3e3effd0b38b214d4c36142c9e73db3f3..1559f7a9f7ac00a917cc742bbc6f9287270e9e20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration private[streaming] object MasterFailureTest extends Logging { - initLogging() @volatile var killed = false @volatile var killCount = 0 @@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag]( */ private[streaming] class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { - initLogging() override def run() { try { @@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread private[streaming] class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) extends Thread with Logging { - initLogging() override def run() { val localTestDir = Files.createTempDir()