diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index cdfc9dd54e06a82b01846066f686e70a858963a5..240f32efaf7d26d093860a7a5ae2683e75060d5b 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging { if (server != null) { throw new ServerStateException("Server is already started") } else { + log.info("Starting HTTP Server") server = new Server() val connector = new SocketConnector connector.setMaxIdleTime(60*1000) diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index b97697d587aada9d51a3e014659549fdf93d16b9..1fdbccdec65c54d9b31509199608dad155a96125 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -33,6 +33,7 @@ trait Logging { // Method to get or create the logger for this object protected def log: Logger = { if (log_ == null) { + initializeIfNecessary() var className = this.getClass.getName // Ignore trailing $'s in the class names for Scala objects if (className.endsWith("$")) { @@ -89,9 +90,15 @@ trait Logging { log.isTraceEnabled } - // Method for ensuring that logging is initialized, to avoid having multiple - // threads do it concurrently (as SLF4J initialization is not thread safe). - protected def initLogging() { + private def initializeIfNecessary() { + Logging.initLock.synchronized { + if (!Logging.initialized) { + initializeLogging() + } + } + } + + private def initializeLogging() { // If Log4j doesn't seem initialized, load a default properties file val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { @@ -101,7 +108,17 @@ trait Logging { case Some(url) => PropertyConfigurator.configure(url) case None => System.err.println(s"Spark was unable to load $defaultLogProps") } + log.info(s"Using Spark's default log4j profile: $defaultLogProps") } + Logging.initialized = true + + // Force a call into slf4j to initialize it avoids this happening from mutliple threads + // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html log } } + +object Logging { + @transient @volatile private var initialized = false + @transient val initLock = new Object() +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ad3337d94c921621fc9e858d3d2066bfaf63717f..70fd499d719c2583af975fa2ce808cbee688783a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -88,9 +88,6 @@ class SparkContext( scala.collection.immutable.Map()) extends Logging { - // Ensure logging is initialized before we spawn any threads - initLogging() - // Set Spark driver host and port system properties if (System.getProperty("spark.driver.host") == null) { System.setProperty("spark.driver.host", Utils.localHostName()) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f19d7a96b0858a21b2aab65480f7616680543e2..782be9a429e72d98f0da2b1931fa41a7f6d6cece 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -48,8 +48,6 @@ private[spark] class Executor( private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) - initLogging() - // No ip or host:port - just hostname Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname") // must not have port specified. diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index caab748d602ee452703091c9f0663ed46a7328f5..6f9f29969eaec85d315c964092c6c815516e75e8 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -26,7 +26,6 @@ import scala.util.matching.Regex import org.apache.spark.Logging private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging { - initLogging() val DEFAULT_PREFIX = "*" val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index bec0c83be8bea24c4c69bc14ad07c72cbf685329..8e038ce98cf463e44977c1af859c6dc44526747e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -63,7 +63,6 @@ import org.apache.spark.metrics.source.Source * [options] is the specific property of this source or sink. */ private[spark] class MetricsSystem private (val instance: String) extends Logging { - initLogging() val confFile = System.getProperty("spark.metrics.conf") val metricsConfig = new MetricsConfig(Option(confFile)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 21022e1cfb0866e30514b1d1db11888a356b3179..e0eb02ce815ca0e4c7da0ac1a8ef1fd2fe4ce0af 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private val akkaTimeout = AkkaUtils.askTimeout - initLogging() - val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs", "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 0c66addf9def6f78628bfff2cec9b7c9e9a8a3b4..21f003609b14de5f511ebca2006afe72faf53d0b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -30,7 +30,6 @@ import org.apache.spark.util.Utils * TODO: Use event model. */ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging { - initLogging() blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive) @@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null - initLogging() - def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index 6ce9127c7450175d61ed66871678ea7b7fc62dc8..a06f50a0ac89cf6f3ada7fad21d8beb59245ab82 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM def length = blockMessages.length - initLogging() - def set(bufferMessage: BufferMessage) { val startTime = System.currentTimeMillis val newBlockMessages = new ArrayBuffer[BlockMessage]() diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index b2a181545938c2265b7196046441743950996fb0..523fd1222dd71dff1770f1488c21ceb73617b9f6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -60,8 +60,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None) def this() = this(None, new JPrintWriter(Console.out, true), None) - initLogging() - var in: InteractiveReader = _ // the input stream from which commands come var settings: Settings = _ var intp: SparkIMain = _ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index a78d3965ee94e0446de016af7e9a309d9d210491..8ebe09da0dcb72ad16491894893776a2a4f5f080 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { - initLogging() - // ======================================================================= // Methods that should be implemented by subclasses of DStream // ======================================================================= diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index daed7ff7c3f1385489c8f691591f100087a39249..a09b891956efe2348043c85cf6f11c22596e3be4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -24,7 +24,6 @@ import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job final private[streaming] class DStreamGraph extends Serializable with Logging { - initLogging() private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3cf9f0dbfd1dbf68cad6ec39591ed294..c759b36f9432687f6b57b1ed2e368e22a730f777 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -95,8 +95,6 @@ class StreamingContext private ( */ def this(path: String) = this(null, CheckpointReader.read(path), null) - initLogging() - if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 5add20871e3fd68b83d63d649af903f2d13f4f51..8c7f42306d58a3e9b8644833388d2ca3cec7f6ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe */ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { - initLogging() - lazy protected val env = SparkEnv.get lazy protected val actor = env.actorSystem.actorOf( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4ab76c31ee50e156fcc775f255264f2..afe9316337a8923f137b7b55f7f28fb259e5f855 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -29,7 +29,6 @@ import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { - initLogging() val ssc = jobScheduler.ssc val clockClass = System.getProperty( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9511ccfbeddd6132b455882bddf65c8bd82bb5e6..488cc2f401d25c1073a21fc4cd6b95aa014841d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -30,8 +30,6 @@ import org.apache.spark.streaming._ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { - initLogging() - val jobSets = new ConcurrentHashMap[Time, JobSet] val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val executor = Executors.newFixedThreadPool(numConcurrentJobs) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 4a3993e3e3effd0b38b214d4c36142c9e73db3f3..1559f7a9f7ac00a917cc742bbc6f9287270e9e20 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration private[streaming] object MasterFailureTest extends Logging { - initLogging() @volatile var killed = false @volatile var killCount = 0 @@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag]( */ private[streaming] class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { - initLogging() override def run() { try { @@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread private[streaming] class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) extends Thread with Logging { - initLogging() override def run() { val localTestDir = Files.createTempDir()