diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 0453e98e7d93996842e70852413f5b4cfb60882c..2052d057889a533e305fa4e4d6805c470183e578 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -512,7 +512,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag]( * supporting the key and value types K and V in this RDD. */ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -520,7 +520,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag]( * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -651,9 +651,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag]( */ def values: RDD[V] = self.map(_._2) - private[spark] def getKeyClass() = implicitly[ClassTag[K]].erasure + private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - private[spark] def getValueClass() = implicitly[ClassTag[V]].erasure + private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass } /** diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index e6e0997a59518cc662a52241eccb971d28a6cdd3..6ee075315a22b6790b95c221c7a7ecf2b37bc280 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -608,7 +608,7 @@ abstract class RDD[T: ClassTag]( * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): Map[T, Long] = { - if (elementClassTag.erasure.isArray) { + if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. @@ -639,7 +639,7 @@ abstract class RDD[T: ClassTag]( timeout: Long, confidence: Double = 0.95 ): PartialResult[Map[T, BoundedDouble]] = { - if (elementClassTag.erasure.isArray) { + if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) => diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 900d73bf428d868b1a8b1fa8d114e62e846b003c..883a0152bbee994c340d64f6818d0e0c5b3273f8 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -40,8 +40,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classTag[T].erasure)) { - classTag[T].erasure + if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) { + classTag[T].runtimeClass } else { // We get the type of the Writable class by looking at the apply method which converts // from T to Writable. Since we have two apply methods we filter out the one which diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 6bd87bf3ec3c5d4030d313cc5820178e65db1c56..7272a592a553ef55d8fef218299b7457f5f36593 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -277,9 +277,9 @@ class SparkContext( (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) : RDD[(K, V)] = { hadoopFile(path, - fm.erasure.asInstanceOf[Class[F]], - km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]], + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], minSplits) } @@ -300,9 +300,9 @@ class SparkContext( (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { newAPIHadoopFile( path, - fm.erasure.asInstanceOf[Class[F]], - km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]]) + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]]) } /** @@ -781,13 +781,13 @@ object SparkContext { private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u - new ArrayWritable(classTag[T].erasure.asInstanceOf[Class[Writable]], + new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) } // Helper objects for converting common types to Writable private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = { - val wClass = classTag[W].erasure.asInstanceOf[Class[W]] + val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } @@ -806,7 +806,7 @@ object SparkContext { implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) implicit def writableWritableConverter[T <: Writable]() = - new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T]) + new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) /** * Find the JAR from which a given class was loaded, to make it easy for users to pass diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 962ba6619d40d07a89e0a218b742cff873f28e5c..aa1a8b6ba23b4c03e99e13c4cc1870393f19b375 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -457,7 +457,7 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -487,7 +487,7 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -509,7 +509,7 @@ extends Serializable { self.foreach(saveFunc) } - private def getKeyClass() = implicitly[ClassTag[K]].erasure + private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - private def getValueClass() = implicitly[ClassTag[V]].erasure + private def getValueClass() = implicitly[ClassTag[V]].runtimeClass }