From 3e63d98f09065386901d78c141b0da93cdce0f76 Mon Sep 17 00:00:00 2001 From: NirmalReddy <nirmal_reddy2000@yahoo.com> Date: Wed, 26 Mar 2014 18:24:55 -0700 Subject: [PATCH] Spark 1095 : Adding explicit return types to all public methods Excluded those that are self-evident and the cases that are discussed in the mailing list. Author: NirmalReddy <nirmal_reddy2000@yahoo.com> Author: NirmalReddy <nirmal.reddy@imaginea.com> Closes #168 from NirmalReddy/Spark-1095 and squashes the following commits: ac54b29 [NirmalReddy] import misplaced 8c5ff3e [NirmalReddy] Changed syntax of unit returning methods 02d0778 [NirmalReddy] fixed explicit types in all the other packages 1c17773 [NirmalReddy] fixed explicit types in core package --- .../scala/org/apache/spark/SparkContext.scala | 31 ++++++++++++------- .../apache/spark/api/java/JavaRDDLike.scala | 15 ++++++--- .../spark/api/java/JavaSparkContext.scala | 2 +- .../apache/spark/deploy/ClientArguments.scala | 2 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../master/ZooKeeperPersistenceEngine.scala | 3 +- .../spark/metrics/sink/ConsoleSink.scala | 2 +- .../apache/spark/metrics/sink/CsvSink.scala | 2 +- .../spark/metrics/sink/GraphiteSink.scala | 4 +-- .../org/apache/spark/rdd/CoGroupedRDD.scala | 2 +- .../org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../scala/org/apache/spark/rdd/JdbcRDD.scala | 2 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../apache/spark/storage/StorageLevel.scala | 7 +++-- .../org/apache/spark/util/Distribution.scala | 7 +++-- .../spark/metrics/sink/GangliaSink.scala | 12 ++++--- .../scala/org/apache/spark/graphx/Graph.scala | 3 +- .../apache/spark/graphx/impl/GraphImpl.scala | 2 +- .../apache/spark/graphx/lib/Analytics.scala | 2 +- .../spark/streaming/StreamingContext.scala | 4 +-- .../streaming/api/java/JavaDStreamLike.scala | 6 ++-- .../api/java/JavaStreamingContext.scala | 22 +++++++++---- .../spark/streaming/dstream/DStream.scala | 8 +++-- .../spark/streaming/scheduler/BatchInfo.scala | 8 +++-- 25 files changed, 97 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4dd298177f..b23accbbb9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -230,7 +231,7 @@ class SparkContext( postEnvironmentUpdate() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ - val hadoopConfiguration = { + val hadoopConfiguration: Configuration = { val env = SparkEnv.get val hadoopConf = SparkHadoopUtil.get.newConfiguration() // Explicitly check for S3 environment variables @@ -630,7 +631,7 @@ class SparkContext( * standard mutable collections. So you can use this with mutable Map, Set, etc. */ def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T] - (initialValue: R) = { + (initialValue: R): Accumulable[R, T] = { val param = new GrowableAccumulableParam[R,T] new Accumulable(initialValue, param) } @@ -640,7 +641,7 @@ class SparkContext( * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. */ - def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal) + def broadcast[T](value: T): Broadcast[T] = env.broadcastManager.newBroadcast[T](value, isLocal) /** * Add a file to be downloaded with this Spark job on every node. @@ -1126,7 +1127,7 @@ object SparkContext extends Logging { implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( - rdd: RDD[(K, V)]) = + rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag]( @@ -1163,27 +1164,33 @@ object SparkContext extends Logging { } // Helper objects for converting common types to Writable - private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = { + private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) + : WritableConverter[T] = { val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } - implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get) + implicit def intWritableConverter(): WritableConverter[Int] = + simpleWritableConverter[Int, IntWritable](_.get) - implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get) + implicit def longWritableConverter(): WritableConverter[Long] = + simpleWritableConverter[Long, LongWritable](_.get) - implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get) + implicit def doubleWritableConverter(): WritableConverter[Double] = + simpleWritableConverter[Double, DoubleWritable](_.get) - implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get) + implicit def floatWritableConverter(): WritableConverter[Float] = + simpleWritableConverter[Float, FloatWritable](_.get) - implicit def booleanWritableConverter() = + implicit def booleanWritableConverter(): WritableConverter[Boolean] = simpleWritableConverter[Boolean, BooleanWritable](_.get) - implicit def bytesWritableConverter() = { + implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) } - implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) + implicit def stringWritableConverter(): WritableConverter[String] = + simpleWritableConverter[String, Text](_.toString) implicit def writableWritableConverter[T <: Writable]() = new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index ddac553304..e03b8e78d5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -391,19 +391,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Save this RDD as a text file, using string representations of elements. */ - def saveAsTextFile(path: String) = rdd.saveAsTextFile(path) + def saveAsTextFile(path: String): Unit = { + rdd.saveAsTextFile(path) + } /** * Save this RDD as a compressed text file, using string representations of elements. */ - def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) = + def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = { rdd.saveAsTextFile(path, codec) + } /** * Save this RDD as a SequenceFile of serialized objects. */ - def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path) + def saveAsObjectFile(path: String): Unit = { + rdd.saveAsObjectFile(path) + } /** * Creates tuples of the elements in this RDD by applying `f`. @@ -420,7 +425,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ - def checkpoint() = rdd.checkpoint() + def checkpoint(): Unit = { + rdd.checkpoint() + } /** * Return whether this RDD has been checkpointed or not diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 35508b6e5a..e531a57ace 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -463,7 +463,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.setCheckpointDir(dir) } - def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir) + def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir) protected def checkpointFile[T](path: String): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 00f5cd54ad..c07838f798 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -112,5 +112,5 @@ private[spark] class ClientArguments(args: Array[String]) { } object ClientArguments { - def isValidJarUrl(s: String) = s.matches("(.+):(.+)jar") + def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar") } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index d2d8d6d662..9bdbfb33bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -32,7 +32,7 @@ import scala.collection.JavaConversions._ * Contains util methods to interact with Hadoop from Spark. */ class SparkHadoopUtil { - val conf = newConfiguration() + val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) def runAsUser(user: String)(func: () => Unit) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 5413ff671a..834dfedee5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import scala.collection.JavaConversions._ import akka.serialization.Serialization +import org.apache.curator.framework.CuratorFramework import org.apache.zookeeper.CreateMode import org.apache.spark.{Logging, SparkConf} @@ -29,7 +30,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" - val zk = SparkCuratorUtil.newClient(conf) + val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) SparkCuratorUtil.mkdir(zk, WORKING_DIR) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 4d2ffc54d8..64eac73605 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -38,7 +38,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry, case None => CONSOLE_DEFAULT_PERIOD } - val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { + val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 319f40815d..544848d415 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -41,7 +41,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry, case None => CSV_DEFAULT_PERIOD } - val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { + val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 0ffdf3846d..7f0a2fd16f 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -39,7 +39,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry, val GRAPHITE_KEY_UNIT = "unit" val GRAPHITE_KEY_PREFIX = "prefix" - def propertyToOption(prop: String) = Option(property.getProperty(prop)) + def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { throw new Exception("Graphite sink requires 'host' property.") @@ -57,7 +57,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry, case None => GRAPHITE_DEFAULT_PERIOD } - val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { + val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 8561711931..9aa454a5c8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -103,7 +103,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: array } - override val partitioner = Some(part) + override val partitioner: Some[Partitioner] = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { val sparkConf = SparkEnv.get.conf diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 932ff5bf36..3af008bd72 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -171,7 +171,7 @@ class HadoopRDD[K, V]( array } - override def compute(theSplit: Partition, context: TaskContext) = { + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopPartition] diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 8df8718f3b..1b503743ac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -116,7 +116,7 @@ class JdbcRDD[T: ClassTag]( } object JdbcRDD { - def resultSetToObjectArray(rs: ResultSet) = { + def resultSetToObjectArray(rs: ResultSet): Array[Object] = { Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index d1fff29687..461a749eac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -80,7 +80,7 @@ class NewHadoopRDD[K, V]( result } - override def compute(theSplit: Partition, context: TaskContext) = { + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6af42248a5..ce2b8ac272 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -121,7 +121,7 @@ abstract class RDD[T: ClassTag]( @transient var name: String = null /** Assign a name to this RDD */ - def setName(_name: String) = { + def setName(_name: String): RDD[T] = { name = _name this } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 1b7934d59f..4212a539da 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -126,15 +126,16 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) /** Create a new StorageLevel object */ - def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1) = + def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, + replication: Int = 1): StorageLevel = getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication)) /** Create a new StorageLevel object from its integer representation */ - def apply(flags: Int, replication: Int) = + def apply(flags: Int, replication: Int): StorageLevel = getCachedStorageLevel(new StorageLevel(flags, replication)) /** Read StorageLevel object from ObjectInput stream */ - def apply(in: ObjectInput) = { + def apply(in: ObjectInput): StorageLevel = { val obj = new StorageLevel() obj.readExternal(in) getCachedStorageLevel(obj) diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala index ab738c4b86..5b347555fe 100644 --- a/core/src/main/scala/org/apache/spark/util/Distribution.scala +++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala @@ -19,6 +19,8 @@ package org.apache.spark.util import java.io.PrintStream +import scala.collection.immutable.IndexedSeq + /** * Util for getting some stats from a small sample of numeric values, with some handy * summary functions. @@ -40,7 +42,8 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) * given from 0 to 1 * @param probabilities */ - def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = { + def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) + : IndexedSeq[Double] = { probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))} } @@ -48,7 +51,7 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) math.min((p * length).toInt + startIdx, endIdx - 1) } - def showQuantiles(out: PrintStream = System.out) = { + def showQuantiles(out: PrintStream = System.out): Unit = { out.println("min\t25%\t50%\t75%\tmax") getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")} out.println diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index cd37317da7..d03d7774e8 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry import com.codahale.metrics.ganglia.GangliaReporter import info.ganglia.gmetric4j.gmetric.GMetric +import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem @@ -33,10 +34,10 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val GANGLIA_DEFAULT_PERIOD = 10 val GANGLIA_KEY_UNIT = "unit" - val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS + val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS val GANGLIA_KEY_MODE = "mode" - val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST + val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST // TTL for multicast messages. If listeners are X hops away in network, must be at least X. val GANGLIA_KEY_TTL = "ttl" @@ -45,7 +46,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val GANGLIA_KEY_HOST = "host" val GANGLIA_KEY_PORT = "port" - def propertyToOption(prop: String) = Option(property.getProperty(prop)) + def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { throw new Exception("Ganglia sink requires 'host' property.") @@ -58,11 +59,12 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val host = propertyToOption(GANGLIA_KEY_HOST).get val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - val mode = propertyToOption(GANGLIA_KEY_MODE) + val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) .getOrElse(GANGLIA_DEFAULT_PERIOD) - val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) + val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) + .map(u => TimeUnit.valueOf(u.toUpperCase)) .getOrElse(GANGLIA_DEFAULT_UNIT) MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 65a1a8c68f..ef05623d7a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -419,5 +419,6 @@ object Graph { * All the convenience operations are defined in the [[GraphOps]] class which may be * shared across multiple graph implementations. */ - implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops + implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag] + (g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops } // end of Graph object diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 5e9be18990..43ac11d895 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -197,7 +197,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def mapReduceTriplets[A: ClassTag]( mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduceFunc: (A, A) => A, - activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 24699dfdd3..fa533a512d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -26,7 +26,7 @@ import org.apache.spark.graphx.PartitionStrategy._ */ object Analytics extends Logging { - def main(args: Array[String]) = { + def main(args: Array[String]): Unit = { val host = args(0) val taskType = args(1) val fname = args(2) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 062b888e80..e198c69470 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -431,7 +431,7 @@ class StreamingContext private[streaming] ( * Stop the execution of the streams. * @param stopSparkContext Stop the associated SparkContext or not */ - def stop(stopSparkContext: Boolean = true) = synchronized { + def stop(stopSparkContext: Boolean = true): Unit = synchronized { scheduler.stop() logInfo("StreamingContext stopped successfully") waiter.notifyStop() @@ -489,7 +489,7 @@ object StreamingContext extends Logging { * Find the JAR from which a given class was loaded, to make it easy for users to pass * their JARs to StreamingContext. */ - def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls) + def jarOfClass(cls: Class[_]): Seq[String] = SparkContext.jarOfClass(cls) private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { // Set the default cleaner delay to an hour if not already set. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a85cd04c93..bb2f492d06 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -49,7 +49,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ - def print() = dstream.print() + def print(): Unit = { + dstream.print() + } /** * Return a new DStream in which each RDD has a single element generated by counting each RDD @@ -401,7 +403,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Enable periodic checkpointing of RDDs of this DStream. * @param interval Time interval after which generated RDD will be checkpointed */ - def checkpoint(interval: Duration) = { + def checkpoint(interval: Duration): DStream[T] = { dstream.checkpoint(interval) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index c48d754e43..b705d2ec9a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -477,31 +477,41 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Start the execution of the streams. */ - def start() = ssc.start() + def start(): Unit = { + ssc.start() + } /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */ - def awaitTermination() = ssc.awaitTermination() + def awaitTermination(): Unit = { + ssc.awaitTermination() + } /** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. * @param timeout time to wait in milliseconds */ - def awaitTermination(timeout: Long) = ssc.awaitTermination(timeout) + def awaitTermination(timeout: Long): Unit = { + ssc.awaitTermination(timeout) + } /** * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. */ - def stop() = ssc.stop() + def stop(): Unit = { + ssc.stop() + } /** * Stop the execution of the streams. * @param stopSparkContext Stop the associated SparkContext or not */ - def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext) + def stop(stopSparkContext: Boolean): Unit = { + ssc.stop(stopSparkContext) + } } /** @@ -579,7 +589,7 @@ object JavaStreamingContext { * Find the JAR from which a given class was loaded, to make it easy for users to pass * their JARs to StreamingContext. */ - def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray + def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 6bff56a9d3..d48b51aa69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -503,14 +503,18 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc) + def foreach(foreachFunc: RDD[T] => Unit): Unit = { + this.foreachRDD(foreachFunc) + } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ @deprecated("use foreachRDD", "0.9.0") - def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc) + def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = { + this.foreachRDD(foreachFunc) + } /** * Apply a function to each RDD in this DStream. This is an output operator, so diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 4e8d07fe92..7f3cd2f8eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -39,17 +39,19 @@ case class BatchInfo( * was submitted to the streaming scheduler. Essentially, it is * `processingStartTime` - `submissionTime`. */ - def schedulingDelay = processingStartTime.map(_ - submissionTime) + def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime) /** * Time taken for the all jobs of this batch to finish processing from the time they started * processing. Essentially, it is `processingEndTime` - `processingStartTime`. */ - def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption + def processingDelay: Option[Long] = processingEndTime.zip(processingStartTime) + .map(x => x._1 - x._2).headOption /** * Time taken for all the jobs of this batch to finish processing from the time they * were submitted. Essentially, it is `processingDelay` + `schedulingDelay`. */ - def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption + def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay) + .map(x => x._1 + x._2).headOption } -- GitLab