diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9680c6f3e1475d7f1111a5461a5613aebbbc6177..4db7339e6716bc25e3035257f80854d30b9fc46e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType()) } @@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 23ec6c3b311f0a78e58ed7f2b0a1387418a53f0f..8c573ac0d65e0847c4218880d8bdeaf6dbf7b7fc 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable { return 1.0 * x; } }).cache(); + doubles.collect(); JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer x) { return new Tuple2<Integer, Integer>(x, x); } }).cache(); + pairs.collect(); JavaRDD<String> strings = rdd.map(new Function<Integer, String>() { @Override public String call(Integer x) { return x.toString(); } }).cache(); + strings.collect(); } @Test @@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable { } } + + @Test + public void collectAsMapWithIntArrayValues() { + // Regression test for SPARK-1040 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); + JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() { + @Override + public Tuple2<Integer, int[]> call(Integer x) throws Exception { + return new Tuple2<Integer, int[]>(x, new int[] { x }); + } + }); + pairRDD.collect(); // Works fine + Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + } } diff --git a/pom.xml b/pom.xml index 54072b053cb5e8389eb8be5922206f7fbb2b9f16..1ac8f0fa079e0d756de2d450d6acce992129b5ea 100644 --- a/pom.xml +++ b/pom.xml @@ -389,7 +389,7 @@ <dependency> <groupId>com.novocode</groupId> <artifactId>junit-interface</artifactId> - <version>0.9</version> + <version>0.10</version> <scope>test</scope> </dependency> <dependency> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 46a1c640e30aed5ddf122886dc57a4eff6c12865..e33f230188fc7df67c6ff132a62db389be4cfd3f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -212,12 +212,13 @@ object SparkBuild extends Build { "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), "org.scalatest" %% "scalatest" % "1.9.1" % "test", "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", - "com.novocode" % "junit-interface" % "0.9" % "test", + "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymock" % "3.1" % "test", "org.mockito" % "mockito-all" % "1.8.5" % "test", "commons-io" % "commons-io" % "2.4" % "test" ), + testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), parallelExecution := true, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a493a8279f94218318be45b7bdc1ee372209dc64..64fe204cdf7a56a6afa273924f7f6f3ab0a297a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 79fa6a623d2908e52e20f35d75cc852899836488..62cfa0a229db14b9ceecb930b511fd92aa0f768f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } override val classTag: ClassTag[(K, V)] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] + implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream {