diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 923b4ed68839c2b8957f7fb18ad9763ac2adb32e..566472e5979585d349547fdc3b2ad06ec09a442c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -341,7 +341,7 @@ class SparkContext(
    */
   def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
     hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
-      minSplits, cloneRecords = false).map(pair => pair._2.toString)
+      minSplits).map(pair => pair._2.toString)
   }
 
   /**
@@ -354,33 +354,37 @@ class SparkContext(
    * @param keyClass Class of the keys
    * @param valueClass Class of the values
    * @param minSplits Minimum number of Hadoop Splits to generate.
-   * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
-   *                     Most RecordReader implementations reuse wrapper objects across multiple
-   *                     records, and can cause problems in RDD collect or aggregation operations.
-   *                     By default the records are cloned in Spark. However, application
-   *                     programmers can explicitly disable the cloning for better performance.
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def hadoopRDD[K: ClassTag, V: ClassTag](
+  def hadoopRDD[K, V](
       conf: JobConf,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int = defaultMinSplits,
-      cloneRecords: Boolean = true
+      minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
     SparkHadoopUtil.get.addCredentials(conf)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
+    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K: ClassTag, V: ClassTag](
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
+  def hadoopFile[K, V](
       path: String,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int = defaultMinSplits,
-      cloneRecords: Boolean = true
+      minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -392,8 +396,7 @@ class SparkContext(
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits,
-      cloneRecords)
+      minSplits)
   }
 
   /**
@@ -403,16 +406,20 @@ class SparkContext(
    * {{{
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
    * }}}
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]]
-      (path: String, minSplits: Int, cloneRecords: Boolean = true)
+      (path: String, minSplits: Int)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     hadoopFile(path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
       vm.runtimeClass.asInstanceOf[Class[V]],
-      minSplits,
-      cloneRecords)
+      minSplits)
   }
 
   /**
@@ -422,68 +429,91 @@ class SparkContext(
    * {{{
    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
    * }}}
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
+  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
-    hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
+    hadoopFile[K, V, F](path, defaultMinSplits)
 
   /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
-      (path: String, cloneRecords: Boolean = true)
+      (path: String)
       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
     newAPIHadoopFile(
       path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
-      vm.runtimeClass.asInstanceOf[Class[V]],
-      cloneRecords = cloneRecords)
+      vm.runtimeClass.asInstanceOf[Class[V]])
   }
 
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
+  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
       path: String,
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
-      conf: Configuration = hadoopConfiguration,
-      cloneRecords: Boolean = true): RDD[(K, V)] = {
+      conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
     val job = new NewHadoopJob(conf)
     NewFileInputFormat.addInputPath(job, new Path(path))
     val updatedConf = job.getConfiguration
-    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
+    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
   }
 
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
-  def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
+  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
       conf: Configuration = hadoopConfiguration,
       fClass: Class[F],
       kClass: Class[K],
-      vClass: Class[V],
-      cloneRecords: Boolean = true): RDD[(K, V)] = {
-    new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
-  }
-
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K: ClassTag, V: ClassTag](path: String,
+      vClass: Class[V]): RDD[(K, V)] = {
+    new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+  }
+
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
+  def sequenceFile[K, V](path: String,
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int,
-      cloneRecords: Boolean = true
+      minSplits: Int
       ): RDD[(K, V)] = {
     val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
-    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
+    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
-      cloneRecords: Boolean = true): RDD[(K, V)] =
-    sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
+  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
+      ): RDD[(K, V)] =
+    sequenceFile(path, keyClass, valueClass, defaultMinSplits)
 
   /**
    * Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -500,9 +530,14 @@ class SparkContext(
    * have a parameterized singleton object). We use functions instead to create a new converter
    * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
    * allow it to figure out the Writable class to use in the subclass case.
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
    def sequenceFile[K, V]
-       (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
+       (path: String, minSplits: Int = defaultMinSplits)
        (implicit km: ClassTag[K], vm: ClassTag[V],
         kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
       : RDD[(K, V)] = {
@@ -511,7 +546,7 @@ class SparkContext(
     val format = classOf[SequenceFileInputFormat[Writable, Writable]]
     val writables = hadoopFile(path, format,
         kc.writableClass(km).asInstanceOf[Class[Writable]],
-        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
+        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
     writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
   }
 
@@ -1024,7 +1059,7 @@ object SparkContext {
   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
 
   implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
-      rdd: RDD[(K, V)]) =
+      rdd: RDD[(K, V)])   =
     new SequenceFileRDDFunctions(rdd)
 
   implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 50ac700823fba11f2650bc6f0ee1986bd700c867..5a426b983519c86a956f4e5c2a50593d66dc68f1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -137,7 +137,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    */
   def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    * */
   def sequenceFile[K, V](path: String,
     keyClass: Class[K],
     valueClass: Class[V],
@@ -148,19 +154,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
   }
 
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String,
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords))
-  }
-
-  /** Get an RDD for a Hadoop SequenceFile. */
+  /** Get an RDD for a Hadoop SequenceFile.
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
   def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
   JavaPairRDD[K, V] = {
     implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -168,15 +168,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
   }
 
-  /** Get an RDD for a Hadoop SequenceFile. */
-  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], 
-    cloneRecords: Boolean):
-  JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords))
-  }
-
   /**
    * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
    * BytesWritable values that contain a serialized partition. This is still an experimental storage
@@ -205,6 +196,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
    * etc).
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def hadoopRDD[K, V, F <: InputFormat[K, V]](
     conf: JobConf,
@@ -218,41 +214,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
-
-  /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
-   * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
-   * using the older MapReduce API (`org.apache.hadoop.mapred`).
-   *
-   * @param conf JobConf for setting up the dataset
-   * @param inputFormatClass Class of the [[InputFormat]]
-   * @param keyClass Class of the keys
-   * @param valueClass Class of the values
-   * @param minSplits Minimum number of Hadoop Splits to generate.
-   * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
-   *                     Most RecordReader implementations reuse wrapper objects across multiple
-   *                     records, and can cause problems in RDD collect or aggregation operations.
-   *                     By default the records are cloned in Spark. However, application
-   *                     programmers can explicitly disable the cloning for better performance.
-   */
-  def hadoopRDD[K, V, F <: InputFormat[K, V]](
-    conf: JobConf,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits,
-      cloneRecords))
-  }
-
   /**
    * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
-   * etc).
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def hadoopRDD[K, V, F <: InputFormat[K, V]](
     conf: JobConf,
@@ -265,7 +234,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat.
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
   def hadoopFile[K, V, F <: InputFormat[K, V]](
     path: String,
     inputFormatClass: Class[F],
@@ -278,22 +253,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](
-    path: String,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, 
-      minSplits, cloneRecords))
-  }
-
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat
+    *
+    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+    * record, directly caching the returned RDD will create many references to the same object.
+    * If you plan to directly cache Hadoop writable objects, you should first copy them using
+    * a `map` function.
+    */
   def hadoopFile[K, V, F <: InputFormat[K, V]](
     path: String,
     inputFormatClass: Class[F],
@@ -306,23 +272,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
       inputFormatClass, keyClass, valueClass))
   }
 
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](
-    path: String,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    cloneRecords: Boolean
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
-    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path,
-      inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords))
-  }
-
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
     path: String,
@@ -338,22 +295,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
-    path: String,
-    fClass: Class[F],
-    kClass: Class[K],
-    vClass: Class[V],
-    conf: Configuration,
-    cloneRecords: Boolean): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(kClass)
-    implicit val vcm: ClassTag[V] = ClassTag(vClass)
-    new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords))
-  }
-
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
+   *
+   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+   * record, directly caching the returned RDD will create many references to the same object.
+   * If you plan to directly cache Hadoop writable objects, you should first copy them using
+   * a `map` function.
    */
   def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
     conf: Configuration,
@@ -365,21 +311,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
   }
 
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
-    conf: Configuration,
-    fClass: Class[F],
-    kClass: Class[K],
-    vClass: Class[V],
-    cloneRecords: Boolean): JavaPairRDD[K, V] = {
-    implicit val kcm: ClassTag[K] = ClassTag(kClass)
-    implicit val vcm: ClassTag[V] = ClassTag(vClass)
-    new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords))
-  }
-
   /** Build the union of two or more RDDs. */
   override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
     val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index dbe76f34316aece155d06607b3dfc87dd72ce39b..ad74d4636fb1b928b812e390b8d3af2261a4b470 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,10 +19,7 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
-import scala.reflect.ClassTag
-
 import org.apache.hadoop.conf.{Configuration, Configurable}
-import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -34,7 +31,6 @@ import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.NextIterator
-import org.apache.spark.util.Utils.cloneWritables
 
 
 /**
@@ -64,21 +60,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- *                     Most RecordReader implementations reuse wrapper objects across multiple
- *                     records, and can cause problems in RDD collect or aggregation operations.
- *                     By default the records are cloned in Spark. However, application
- *                     programmers can explicitly disable the cloning for better performance.
  */
-class HadoopRDD[K: ClassTag, V: ClassTag](
+class HadoopRDD[K, V](
     sc: SparkContext,
     broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    minSplits: Int,
-    cloneRecords: Boolean = true)
+    minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   def this(
@@ -87,8 +77,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minSplits: Int,
-      cloneRecords: Boolean) = {
+      minSplits: Int) = {
     this(
       sc,
       sc.broadcast(new SerializableWritable(conf))
@@ -97,8 +86,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       inputFormatClass,
       keyClass,
       valueClass,
-      minSplits,
-      cloneRecords)
+      minSplits)
   }
 
   protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -170,9 +158,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback{ () => closeIfNeeded() }
       val key: K = reader.createKey()
-      val keyCloneFunc = cloneWritables[K](jobConf)
       val value: V = reader.createValue()
-      val valueCloneFunc = cloneWritables[V](jobConf)
       override def getNext() = {
         try {
           finished = !reader.next(key, value)
@@ -180,11 +166,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
           case eof: EOFException =>
             finished = true
         }
-        if (cloneRecords) {
-          (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
-        } else {
-          (key, value)
-        }
+        (key, value)
       }
 
       override def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 992bd4aa0ad5dbc17283048a1f0613a3ea3144b6..d1fff296878c38b9308e6126e9348d021694d27d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,15 +20,11 @@ package org.apache.spark.rdd
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import scala.reflect.ClassTag
-
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.util.Utils.cloneWritables
-
 
 private[spark]
 class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
@@ -48,19 +44,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
  * @param conf The Hadoop configuration.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- *                     Most RecordReader implementations reuse wrapper objects across multiple
- *                     records, and can cause problems in RDD collect or aggregation operations.
- *                     By default the records are cloned in Spark. However, application
- *                     programmers can explicitly disable the cloning for better performance.
  */
-class NewHadoopRDD[K: ClassTag, V: ClassTag](
+class NewHadoopRDD[K, V](
     sc : SparkContext,
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
-    @transient conf: Configuration,
-    cloneRecords: Boolean)
+    @transient conf: Configuration)
   extends RDD[(K, V)](sc, Nil)
   with SparkHadoopMapReduceUtil
   with Logging {
@@ -107,8 +97,6 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
 
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback(() => close())
-      val keyCloneFunc = cloneWritables[K](conf)
-      val valueCloneFunc = cloneWritables[V](conf)
       var havePair = false
       var finished = false
 
@@ -125,13 +113,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val key = reader.getCurrentKey
-        val value = reader.getCurrentValue
-        if (cloneRecords) {
-          (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
-        } else {
-          (key, value)
-        }
+        (reader.getCurrentKey, reader.getCurrentValue)
       }
 
       private def close() {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index caa9bf4c9280eff3e6ed396b38c0e2d896218498..61d8ef5efeb000a162c4770408f5f67bac88bec6 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
-import scala.reflect.{classTag, ClassTag}
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -46,27 +45,6 @@ import org.apache.spark.{SparkConf, SparkException, Logging}
  */
 private[spark] object Utils extends Logging {
 
-  /**
-   *  We try to clone for most common types of writables and we call WritableUtils.clone otherwise
-   *  intention is to optimize, for example for NullWritable there is no need and for Long, int and
-   *  String creating a new object with value set would be faster.
-   */
-  def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
-    val cloneFunc = classTag[T] match {
-      case ClassTag(_: Text) => 
-        (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
-      case ClassTag(_: LongWritable) => 
-        (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
-      case ClassTag(_: IntWritable) => 
-        (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
-      case ClassTag(_: NullWritable) => 
-        (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
-      case _ => 
-        (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
-    }
-    cloneFunc
-  }
-
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()