From 1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Tue, 14 Jan 2014 22:20:14 -0800 Subject: [PATCH] Changed SparkConf to not be serializable. And also fixed unit-test log paths in log4j.properties of external modules. --- .../scala/org/apache/spark/Aggregator.scala | 3 +- .../scala/org/apache/spark/SparkConf.scala | 3 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +- .../flume/src/test/resources/log4j.properties | 2 +- .../kafka/src/test/resources/log4j.properties | 2 +- .../streaming/kafka/KafkaStreamSuite.scala | 1 + .../mqtt/src/test/resources/log4j.properties | 2 +- .../streaming/mqtt/MQTTStreamSuite.scala | 1 + .../src/test/resources/log4j.properties | 2 +- .../twitter/TwitterStreamSuite.scala | 1 + .../src/test/resources/log4j.properties | 2 +- .../streaming/zeromq/ZeroMQStreamSuite.scala | 1 + .../apache/spark/streaming/Checkpoint.scala | 10 ++++--- .../apache/spark/streaming/DStreamGraph.scala | 2 ++ .../dstream/DStreamCheckpointData.scala | 26 ++++++++++++++++- .../spark/streaming/CheckpointSuite.scala | 28 +++++++++++++------ 16 files changed, 66 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 6d439fdc68..bcf8ae5fb6 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -31,8 +31,7 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) + private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = { diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2de32231e8..fc60cf6f71 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -4,6 +4,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import com.typesafe.config.ConfigFactory +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -24,7 +25,7 @@ import com.typesafe.config.ConfigFactory * * @param loadDefaults whether to load values from the system properties and classpath */ -class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging { +class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 9c6b308804..f2feb406f7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -66,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] - private val sparkConf = SparkEnv.get.conf private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -106,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - + val sparkConf = SparkEnv.get.conf val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties index 063529a9cb..d1bd73a843 100644 --- a/external/flume/src/test/resources/log4j.properties +++ b/external/flume/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/flume/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties index 063529a9cb..38910d1130 100644 --- a/external/kafka/src/test/resources/log4j.properties +++ b/external/kafka/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/kafka/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 9c81f23c19..d9809f6409 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -35,5 +35,6 @@ class KafkaStreamSuite extends TestSuiteBase { ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data + ssc.stop() } } diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties index 063529a9cb..d0462c7336 100644 --- a/external/mqtt/src/test/resources/log4j.properties +++ b/external/mqtt/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/mqtt/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 73e7ce6e96..89c40ad461 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -32,5 +32,6 @@ class MQTTStreamSuite extends TestSuiteBase { val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data + ssc.stop() } } diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties index 063529a9cb..c918335fcd 100644 --- a/external/twitter/src/test/resources/log4j.properties +++ b/external/twitter/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/twitter/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index ccc38784ef..06ab0cdaf3 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -39,5 +39,6 @@ class TwitterStreamSuite extends TestSuiteBase { // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream + ssc.stop() } } diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties index 063529a9cb..304683dd0b 100644 --- a/external/zeromq/src/test/resources/log4j.properties +++ b/external/zeromq/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/zeromq/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 4193b8a02f..92d55a7a7b 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -40,5 +40,6 @@ class ZeroMQStreamSuite extends TestSuiteBase { StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving + ssc.stop() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5046a1d53f..4d778dc4d4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) - val sparkConf = ssc.conf + val sparkConfPairs = ssc.conf.getAll - // These should be unset when a checkpoint is deserialized, - // otherwise the SparkContext won't initialize correctly. - sparkConf.remove("spark.driver.host").remove("spark.driver.port") + def sparkConf = { + new SparkConf(false).setAll(sparkConfPairs) + .remove("spark.driver.host") + .remove("spark.driver.port") + } def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 8faa79f8c7..0683113bd0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true + logDebug("Enabled checkpoint mode") oos.defaultWriteObject() checkpointInProgress = false + logDebug("Disabled checkpoint mode") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 38bad5ac80..906a16e508 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{ObjectInputStream, IOException} +import java.io.{ObjectOutputStream, ObjectInputStream, IOException} import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging @@ -117,8 +117,32 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" } + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (dstream.context.graph != null) { + dstream.context.graph.synchronized { + if (dstream.context.graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException( + "Graph is unexpectedly null when DStream is being serialized.") + } + } + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() timeToOldestCheckpointFileTime = new HashMap[Time, Time] timeToCheckpointFile = new HashMap[Time, String] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 89daf47586..831e7c1471 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase { val value = "myvalue" System.setProperty(key, value) ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + val cp = new Checkpoint(ssc, Time(1000)) - assert(!cp.sparkConf.contains("spark.driver.host")) - assert(!cp.sparkConf.contains("spark.driver.port")) - assert(!cp.sparkConf.contains("spark.hostPort")) - assert(cp.sparkConf.get(key) === value) + val cpConf = cp.sparkConf + assert(cpConf.get("spark.master") === originalConf.get("spark.master")) + assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(cpConf.get(key) === value) ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(!newCp.sparkConf.contains("spark.driver.host")) - assert(!newCp.sparkConf.contains("spark.driver.port")) - assert(!newCp.sparkConf.contains("spark.hostPort")) - assert(newCp.sparkConf.get(key) === value) + + val newCpConf = newCp.sparkConf + assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(newCpConf.get(key) === value) + assert(!newCpConf.contains("spark.driver.host")) + assert(!newCpConf.contains("spark.driver.port")) + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(restoredConf.get(key) === value) } -- GitLab