diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index af443279a11f3ff937785dfc950924ba9e75a67a..ca0115f90e49eb48b8dbaea9529ad63f1bafe01d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -64,7 +64,7 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Confi val file = new Path(checkpointDir, "graph") val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) - val compressionCodec = CompressionCodec.createCodec() + val compressionCodec = CompressionCodec.createCodec(conf) // The file to which we actually write - and then "move" to file val writeFile = new Path(file.getParent, file.getName + ".next") // The file to which existing checkpoint is backed up (i.e. "moved") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index e44821173200ae752a4971027f8309ee24a9e8f4..ab60a8166ea70bfea847dc1afb2896152422a1f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -50,7 +50,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val clock = { val clockClass = ssc.sc.conf.getOrElse( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] + Class.forName(clockClass).newInstance().asInstanceOf[Clock] } val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))