diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/default-log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..d72dbadc3904f327effddf99594045067be2f529 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/default-log4j.properties @@ -0,0 +1,8 @@ +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index cdfc9dd54e06a82b01846066f686e70a858963a5..69a738dc4446ac23965a8cbd3c7823f014a1fba2 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging { if (server != null) { throw new ServerStateException("Server is already started") } else { + logInfo("Starting HTTP Server") server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 6a973ea4951c3a389a202f6c5dfd77f355061648..d519fc5a2941904323d0b6c5c2b83a4231b30115 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -17,8 +17,8 @@ package org.apache.spark -import org.slf4j.Logger -import org.slf4j.LoggerFactory +import org.apache.log4j.{LogManager, PropertyConfigurator} +import org.slf4j.{Logger, LoggerFactory} /** * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows @@ -33,6 +33,7 @@ trait Logging { // Method to get or create the logger for this object protected def log: Logger = { if (log_ == null) { + initializeIfNecessary() var className = this.getClass.getName // Ignore trailing $'s in the class names for Scala objects if (className.endsWith("$")) { @@ -89,7 +90,37 @@ trait Logging { log.isTraceEnabled } - // Method for ensuring that logging is initialized, to avoid having multiple - // threads do it concurrently (as SLF4J initialization is not thread safe). - protected def initLogging() { log } + private def initializeIfNecessary() { + if (!Logging.initialized) { + Logging.initLock.synchronized { + if (!Logging.initialized) { + initializeLogging() + } + } + } + } + + private def initializeLogging() { + // If Log4j doesn't seem initialized, load a default properties file + val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + val defaultLogProps = "org/apache/spark/default-log4j.properties" + val classLoader = this.getClass.getClassLoader + Option(classLoader.getResource(defaultLogProps)) match { + case Some(url) => PropertyConfigurator.configure(url) + case None => System.err.println(s"Spark was unable to load $defaultLogProps") + } + log.info(s"Using Spark's default log4j profile: $defaultLogProps") + } + Logging.initialized = true + + // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads + // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html + log + } +} + +object Logging { + @volatile private var initialized = false + val initLock = new Object() } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7514ce58fba4ad984997c13f2ebf6ace55bed328..d3903ce9e336dedde50700d00776205de82184c9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -88,9 +88,6 @@ class SparkContext( scala.collection.immutable.Map()) extends Logging { - // Ensure logging is initialized before we spawn any threads - initLogging() - // Set Spark driver host and port system properties if (System.getProperty("spark.driver.host") == null) { System.setProperty("spark.driver.host", Utils.localHostName()) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f19d7a96b0858a21b2aab65480f7616680543e2..782be9a429e72d98f0da2b1931fa41a7f6d6cece 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -48,8 +48,6 @@ private[spark] class Executor( private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) - initLogging() - // No ip or host:port - just hostname Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname") // must not have port specified. diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index caab748d602ee452703091c9f0663ed46a7328f5..6f9f29969eaec85d315c964092c6c815516e75e8 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -26,7 +26,6 @@ import scala.util.matching.Regex import org.apache.spark.Logging private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging { - initLogging() val DEFAULT_PREFIX = "*" val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index bec0c83be8bea24c4c69bc14ad07c72cbf685329..8e038ce98cf463e44977c1af859c6dc44526747e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -63,7 +63,6 @@ import org.apache.spark.metrics.source.Source * [options] is the specific property of this source or sink. */ private[spark] class MetricsSystem private (val instance: String) extends Logging { - initLogging() val confFile = System.getProperty("spark.metrics.conf") val metricsConfig = new MetricsConfig(Option(confFile)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 21022e1cfb0866e30514b1d1db11888a356b3179..e0eb02ce815ca0e4c7da0ac1a8ef1fd2fe4ce0af 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private val akkaTimeout = AkkaUtils.askTimeout - initLogging() - val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs", "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 0c66addf9def6f78628bfff2cec9b7c9e9a8a3b4..21f003609b14de5f511ebca2006afe72faf53d0b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -30,7 +30,6 @@ import org.apache.spark.util.Utils * TODO: Use event model. */ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging { - initLogging() blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive) @@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null - initLogging() - def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index 6ce9127c7450175d61ed66871678ea7b7fc62dc8..a06f50a0ac89cf6f3ada7fad21d8beb59245ab82 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM def length = blockMessages.length - initLogging() - def set(bufferMessage: BufferMessage) { val startTime = System.currentTimeMillis val newBlockMessages = new ArrayBuffer[BlockMessage]() diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index f25d921d3f87faa004975500e56c8b96416c3a13..70bfb81661381e87003afd2c2f8aefbe59afd537 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -26,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { - initLogging() - var checkpointDir: File = _ val partitioner = new HashPartitioner(2) diff --git a/spark-class b/spark-class index 802e4aa1045e483fc96f061b44640d53f781df9b..1858ea62476d9216aa5cf31563962408983ae279 100755 --- a/spark-class +++ b/spark-class @@ -129,11 +129,16 @@ fi # Compute classpath using external script CLASSPATH=`$FWDIR/bin/compute-classpath.sh` -CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR" + +if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then + CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR" +fi if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` - export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR` + if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then + export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR` + fi fi export CLASSPATH diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index a78d3965ee94e0446de016af7e9a309d9d210491..8ebe09da0dcb72ad16491894893776a2a4f5f080 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { - initLogging() - // ======================================================================= // Methods that should be implemented by subclasses of DStream // ======================================================================= diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index daed7ff7c3f1385489c8f691591f100087a39249..a09b891956efe2348043c85cf6f11c22596e3be4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -24,7 +24,6 @@ import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job final private[streaming] class DStreamGraph extends Serializable with Logging { - initLogging() private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 8898fdcb7f58b88eda0516c180fc174be7cb94bb..659da2665ac6dfe4cd5676178b37cc8d64051686 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -94,8 +94,6 @@ class StreamingContext private ( */ def this(path: String) = this(null, CheckpointReader.read(path), null) - initLogging() - if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 5add20871e3fd68b83d63d649af903f2d13f4f51..8c7f42306d58a3e9b8644833388d2ca3cec7f6ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe */ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { - initLogging() - lazy protected val env = SparkEnv.get lazy protected val actor = env.actorSystem.actorOf( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 921a33a4cbf3b9172282489be0617e019d42d82a..2eb3b3989e2dba0834528dab31244631890256c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -36,8 +36,6 @@ private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { - initLogging() - val ssc = jobScheduler.ssc val graph = ssc.graph val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9511ccfbeddd6132b455882bddf65c8bd82bb5e6..488cc2f401d25c1073a21fc4cd6b95aa014841d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -30,8 +30,6 @@ import org.apache.spark.streaming._ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { - initLogging() - val jobSets = new ConcurrentHashMap[Time, JobSet] val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val executor = Executors.newFixedThreadPool(numConcurrentJobs) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 4a3993e3e3effd0b38b214d4c36142c9e73db3f3..1559f7a9f7ac00a917cc742bbc6f9287270e9e20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration private[streaming] object MasterFailureTest extends Logging { - initLogging() @volatile var killed = false @volatile var killCount = 0 @@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag]( */ private[streaming] class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { - initLogging() override def run() { try { @@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread private[streaming] class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) extends Thread with Logging { - initLogging() override def run() { val localTestDir = Files.createTempDir()