diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 6d439fdc684afaae595ad34e6d521f923633c9e7..bcf8ae5fb681965aad1852563ec21fbb3c60dcaf 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -31,8 +31,7 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) + private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = { diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2de32231e87142c085fbab5de40bc9587d5fcfb2..fc60cf6f71e26fc39ec7ffe376deae60a0b709c4 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -4,6 +4,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import com.typesafe.config.ConfigFactory +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -24,7 +25,7 @@ import com.typesafe.config.ConfigFactory * * @param loadDefaults whether to load values from the system properties and classpath */ -class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging { +class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 9c6b308804c77da8f48b3bbc9241b491079d6c46..f2feb406f7783d53ff31e3b3306a02725743a38f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -66,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] - private val sparkConf = SparkEnv.get.conf private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -106,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - + val sparkConf = SparkEnv.get.conf val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties index 063529a9cbc67fe9cbc58c63cad51c43be18ad8a..d1bd73a8430e15aa86c3d3f933410c1d0d62b276 100644 --- a/external/flume/src/test/resources/log4j.properties +++ b/external/flume/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/flume/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties index 063529a9cbc67fe9cbc58c63cad51c43be18ad8a..38910d113050aba826293039764001f90b0d1c5e 100644 --- a/external/kafka/src/test/resources/log4j.properties +++ b/external/kafka/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/kafka/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 9c81f23c191180c3f11262965c4eea1d21ae9747..d9809f6409d444c480ffdb311fcd40e496961c3e 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -35,5 +35,6 @@ class KafkaStreamSuite extends TestSuiteBase { ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data + ssc.stop() } } diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties index 063529a9cbc67fe9cbc58c63cad51c43be18ad8a..d0462c7336df57731bea2c7abd4e7eb5c0d54d9b 100644 --- a/external/mqtt/src/test/resources/log4j.properties +++ b/external/mqtt/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/mqtt/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 73e7ce6e968c6fdc1575562beb969a3a5b616b79..89c40ad4619c9ef95340ea8bc3e5bbfa33f0e580 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -32,5 +32,6 @@ class MQTTStreamSuite extends TestSuiteBase { val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data + ssc.stop() } } diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties index 063529a9cbc67fe9cbc58c63cad51c43be18ad8a..c918335fcdc70cb377e07f26ea1b9237faad9411 100644 --- a/external/twitter/src/test/resources/log4j.properties +++ b/external/twitter/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/twitter/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index ccc38784ef671185cc564ea9354e9c11af2f8407..06ab0cdaf3b4e1fa676f2d43fce8d5b7242a6e66 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -39,5 +39,6 @@ class TwitterStreamSuite extends TestSuiteBase { // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream + ssc.stop() } } diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties index 063529a9cbc67fe9cbc58c63cad51c43be18ad8a..304683dd0bac3a168997188ea94bb91b070cdd37 100644 --- a/external/zeromq/src/test/resources/log4j.properties +++ b/external/zeromq/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/zeromq/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 4193b8a02f14abefb1b17634e2f03709128588e2..92d55a7a7b6e4c81b7668c1ef101e26a0c2d3ce9 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -40,5 +40,6 @@ class ZeroMQStreamSuite extends TestSuiteBase { StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving + ssc.stop() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5046a1d53fa41a992a43c4605b7be84f1c66cc6d..4d778dc4d43b4e7bf180bc3b55fb3e1341d3f947 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) - val sparkConf = ssc.conf + val sparkConfPairs = ssc.conf.getAll - // These should be unset when a checkpoint is deserialized, - // otherwise the SparkContext won't initialize correctly. - sparkConf.remove("spark.driver.host").remove("spark.driver.port") + def sparkConf = { + new SparkConf(false).setAll(sparkConfPairs) + .remove("spark.driver.host") + .remove("spark.driver.port") + } def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 8faa79f8c7e9d8b48bff46497cfb0a2550e15ee5..0683113bd0b51ad62c18f6a3f66a5a818a20c7a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true + logDebug("Enabled checkpoint mode") oos.defaultWriteObject() checkpointInProgress = false + logDebug("Disabled checkpoint mode") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 38bad5ac8042ade004d3b2d11a755ba670de80ef..906a16e508cd83490c986bcab67d5ec9b1edfe21 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{ObjectInputStream, IOException} +import java.io.{ObjectOutputStream, ObjectInputStream, IOException} import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging @@ -117,8 +117,32 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" } + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (dstream.context.graph != null) { + dstream.context.graph.synchronized { + if (dstream.context.graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException( + "Graph is unexpectedly null when DStream is being serialized.") + } + } + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() timeToOldestCheckpointFileTime = new HashMap[Time, Time] timeToCheckpointFile = new HashMap[Time, String] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 89daf4758661ba2875611147bb3890ad545ad201..831e7c1471a09faed354e67840b6598e0014505f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase { val value = "myvalue" System.setProperty(key, value) ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + val cp = new Checkpoint(ssc, Time(1000)) - assert(!cp.sparkConf.contains("spark.driver.host")) - assert(!cp.sparkConf.contains("spark.driver.port")) - assert(!cp.sparkConf.contains("spark.hostPort")) - assert(cp.sparkConf.get(key) === value) + val cpConf = cp.sparkConf + assert(cpConf.get("spark.master") === originalConf.get("spark.master")) + assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(cpConf.get(key) === value) ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(!newCp.sparkConf.contains("spark.driver.host")) - assert(!newCp.sparkConf.contains("spark.driver.port")) - assert(!newCp.sparkConf.contains("spark.hostPort")) - assert(newCp.sparkConf.get(key) === value) + + val newCpConf = newCp.sparkConf + assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(newCpConf.get(key) === value) + assert(!newCpConf.contains("spark.driver.host")) + assert(!newCpConf.contains("spark.driver.port")) + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(restoredConf.get(key) === value) }