diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dcb6b6824b0a61a69eb7f09ed188a88c402bd7bb..e9d2f5757963d46a97764e400c04b9b14ff18c2e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1281,8 +1281,10 @@ object SparkContext extends Logging { // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = + implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) + } implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index d250bef6aad0fdcee1bb9521d198231710bb7b6e..d2b9ee427656bab9897be3629da21d6dd592abea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -52,11 +52,12 @@ import org.apache.spark.util.SerializableHyperLogLog * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ -class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) +class PairRDDFunctions[K, V](self: RDD[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with SparkHadoopMapReduceUtil - with Serializable { - + with Serializable +{ /** * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C @@ -77,7 +78,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 - if (getKeyClass().isArray) { + if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } @@ -170,7 +171,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { - if (getKeyClass().isArray) { + if (keyClass.isArray) { throw new SparkException("reduceByKeyLocally() does not support array keys") } @@ -288,7 +289,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Return a copy of the RDD partitioned using the specified partitioner. */ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { - if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) { + if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner) @@ -458,7 +459,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = { - if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { + if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) @@ -473,7 +474,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { - if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { + if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) @@ -573,7 +574,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * supporting the key and value types K and V in this RDD. */ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -584,7 +585,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { val runtimeClass = fm.runtimeClass - saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec) + saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) } /** @@ -592,7 +593,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -782,7 +783,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def values: RDD[V] = self.map(_._2) - private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass + private[spark] def keyClass: Class[_] = kt.runtimeClass + + private[spark] def valueClass: Class[_] = vt.runtimeClass - private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass + private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 596dcb84db7bf53968275e60212b14a57fd08e26..6c897cc03b64113399ad312a0e7363d70ebf1c72 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -284,7 +284,7 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numPartitions: Int): RDD[T] = + def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) /** @@ -301,7 +301,7 @@ abstract class RDD[T: ClassTag]( * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ - def repartition(numPartitions: Int): RDD[T] = { + def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { coalesce(numPartitions, shuffle = true) } @@ -325,7 +325,8 @@ abstract class RDD[T: ClassTag]( * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ - def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { + def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) + : RDD[T] = { if (shuffle) { // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( @@ -424,10 +425,11 @@ abstract class RDD[T: ClassTag]( * * Note that this method performs a shuffle internally. */ - def intersection(other: RDD[T]): RDD[T] = + def intersection(other: RDD[T]): RDD[T] = { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys + } /** * Return the intersection of this RDD and another one. The output will not contain any duplicate @@ -437,10 +439,12 @@ abstract class RDD[T: ClassTag]( * * @param partitioner Partitioner to use for the resulting RDD */ - def intersection(other: RDD[T], partitioner: Partitioner): RDD[T] = + def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null) + : RDD[T] = { this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys + } /** * Return the intersection of this RDD and another one. The output will not contain any duplicate @@ -450,10 +454,11 @@ abstract class RDD[T: ClassTag]( * * @param numPartitions How many partitions to use in the resulting RDD */ - def intersection(other: RDD[T], numPartitions: Int): RDD[T] = + def intersection(other: RDD[T], numPartitions: Int): RDD[T] = { this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new HashPartitioner(numPartitions)) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys + } /** * Return an RDD created by coalescing all elements within each partition into an array. @@ -467,22 +472,25 @@ abstract class RDD[T: ClassTag]( def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) /** - * Return an RDD of grouped items. + * Return an RDD of grouped items. Each group consists of a key and a sequence of elements + * mapping to that key. */ - def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] = + def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] = + def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy(f, new HashPartitioner(numPartitions)) /** - * Return an RDD of grouped items. + * Return an RDD of grouped items. Each group consists of a key and a sequence of elements + * mapping to that key. */ - def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = { + def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) + : RDD[(K, Iterable[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } @@ -739,7 +747,7 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD with the elements from `this` that are not in `other`. */ - def subtract(other: RDD[T], p: Partitioner): RDD[T] = { + def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples @@ -847,7 +855,7 @@ abstract class RDD[T: ClassTag]( * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final * combine step happens locally on the master, equivalent to running a single reduce task. */ - def countByValue(): Map[T, Long] = { + def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = { if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValue() does not support arrays") } @@ -877,10 +885,10 @@ abstract class RDD[T: ClassTag]( * Approximate version of countByValue(). */ @Experimental - def countByValueApprox( - timeout: Long, - confidence: Double = 0.95 - ): PartialResult[Map[T, BoundedDouble]] = { + def countByValueApprox(timeout: Long, confidence: Double = 0.95) + (implicit ord: Ordering[T] = null) + : PartialResult[Map[T, BoundedDouble]] = + { if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } @@ -1030,13 +1038,13 @@ abstract class RDD[T: ClassTag]( * Returns the max of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */ - def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max) + def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max) /** * Returns the min of this RDD as defined by the implicit Ordering[T]. * @return the minimum element of the RDD * */ - def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min) + def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min) /** * Save this RDD as a text file, using string representations of elements. diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 7df9a2960d8a59aa7a46f271c3db808596d35e95..9a1efc83cbe6af45c7d8884afdc941880830daa3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -68,8 +68,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag val keyClass = getWritableClass[K] val valueClass = getWritableClass[V] - val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) - val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) + val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass) + val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass) logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..4bd889135631b305c3c0424ec30780f7cf500770 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ + +class ImplicitOrderingSuite extends FunSuite with LocalSparkContext { + class NonOrderedClass {} + + class ComparableClass extends Comparable[ComparableClass] { + override def compareTo(o: ComparableClass): Int = ??? + } + + class OrderedClass extends Ordered[OrderedClass] { + override def compare(o: OrderedClass): Int = ??? + } + + // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should. + test("basic inference of Orderings"){ + sc = new SparkContext("local", "test") + val rdd = sc.parallelize(1 to 10) + + // Infer orderings after basic maps to particular types + assert(rdd.map(x => (x, x)).keyOrdering.isDefined) + assert(rdd.map(x => (1, x)).keyOrdering.isDefined) + assert(rdd.map(x => (x.toString, x)).keyOrdering.isDefined) + assert(rdd.map(x => (null, x)).keyOrdering.isDefined) + assert(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty) + assert(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined) + assert(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined) + + // Infer orderings for other RDD methods + assert(rdd.groupBy(x => x).keyOrdering.isDefined) + assert(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty) + assert(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined) + assert(rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined) + assert(rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined) + assert(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9ba6e02229aaa8467aae23ab6e4cd9e457d6b00a..1c895430582110baa70bb3ba82d53b7178be79e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -495,7 +495,10 @@ class StreamingContext private[streaming] ( object StreamingContext extends Logging { - implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { + private[streaming] val DEFAULT_CLEANER_TTL = 3600 + + implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairDStreamFunctions[K, V](stream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index a7e5215437e54bb0393263dadd9e817295f5762f..d393cc03cb33e960ec88ce1aeb76ecbdbd7e0c51 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -488,7 +488,8 @@ abstract class DStream[T: ClassTag] ( * the RDDs with `numPartitions` partitions (Spark's default number of partitions if * `numPartitions` not specified). */ - def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = + def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null) + : DStream[(T, Long)] = this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) /** @@ -686,9 +687,10 @@ abstract class DStream[T: ClassTag] ( def countByValueAndWindow( windowDuration: Duration, slideDuration: Duration, - numPartitions: Int = ssc.sc.defaultParallelism - ): DStream[(T, Long)] = { - + numPartitions: Int = ssc.sc.defaultParallelism) + (implicit ord: Ordering[T] = null) + : DStream[(T, Long)] = + { this.map(x => (x, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 354bc132dcdc0f752be77efa89248140f19ba2b6..826bf39e860e17f19fb9f053e3416662b99c988e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -37,13 +37,13 @@ import org.apache.spark.streaming.{Time, Duration} * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use * these functions. */ -class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) - extends Serializable { - +class PairDStreamFunctions[K, V](self: DStream[(K,V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]) + extends Serializable +{ private[streaming] def ssc = self.ssc - private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) - = { + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions) } @@ -576,7 +576,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -607,7 +607,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -630,7 +630,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) self.foreachRDD(saveFunc) } - private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass + private def keyClass: Class[_] = kt.runtimeClass - private def getValueClass() = implicitly[ClassTag[V]].runtimeClass + private def valueClass: Class[_] = vt.runtimeClass }