diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index adaa1ef6cf9ff634f022fee9a0d79ac5c23a69fe..f3b05e12430454c4944dc05b97e519532d95c2de 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.api.python
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SerializableWritable, SparkException}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io._
 import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
  * transformation code by overriding the convert method.
  */
 @Experimental
-trait Converter[T, U] extends Serializable {
+trait Converter[T, + U] extends Serializable {
   def convert(obj: T): U
 }
 
 private[python] object Converter extends Logging {
 
-  def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+  def getInstance(converterClass: Option[String],
+                  defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
     converterClass.map { cc =>
       Try {
         val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,7 +51,7 @@ private[python] object Converter extends Logging {
           logError(s"Failed to load converter: $cc")
           throw err
       }
-    }.getOrElse { new DefaultConverter }
+    }.getOrElse { defaultConverter }
   }
 }
 
@@ -57,7 +59,9 @@ private[python] object Converter extends Logging {
  * A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
  * Other objects are passed through without conversion.
  */
-private[python] class DefaultConverter extends Converter[Any, Any] {
+private[python] class WritableToJavaConverter(
+    conf: Broadcast[SerializableWritable[Configuration]],
+    batchSize: Int) extends Converter[Any, Any] {
 
   /**
    * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
       case fw: FloatWritable => fw.get()
       case t: Text => t.toString
       case bw: BooleanWritable => bw.get()
-      case byw: BytesWritable => byw.getBytes
+      case byw: BytesWritable =>
+        val bytes = new Array[Byte](byw.getLength)
+        System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
+        bytes
       case n: NullWritable => null
-      case aw: ArrayWritable => aw.get().map(convertWritable(_))
-      case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
-        (convertWritable(k), convertWritable(v))
-      }.toMap)
+      case aw: ArrayWritable =>
+        // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
+        // Since we can't determine element types for empty arrays, we will not attempt to
+        // convert to primitive arrays (which get pickled to Python arrays). Users may want
+        // write custom converters for arrays if they know the element types a priori.
+        aw.get().map(convertWritable(_))
+      case mw: MapWritable =>
+        val map = new java.util.HashMap[Any, Any]()
+        mw.foreach { case (k, v) =>
+          map.put(convertWritable(k), convertWritable(v))
+        }
+        map
+      case w: Writable =>
+        if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
       case other => other
     }
   }
 
-  def convert(obj: Any): Any = {
+  override def convert(obj: Any): Any = {
     obj match {
       case writable: Writable =>
         convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
   }
 }
 
+/**
+ * A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
+ * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
+ * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
+ */
+private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
+
+  /**
+   * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
+   * supported out-of-the-box.
+   */
+  private def convertToWritable(obj: Any): Writable = {
+    import collection.JavaConversions._
+    obj match {
+      case i: java.lang.Integer => new IntWritable(i)
+      case d: java.lang.Double => new DoubleWritable(d)
+      case l: java.lang.Long => new LongWritable(l)
+      case f: java.lang.Float => new FloatWritable(f)
+      case s: java.lang.String => new Text(s)
+      case b: java.lang.Boolean => new BooleanWritable(b)
+      case aob: Array[Byte] => new BytesWritable(aob)
+      case null => NullWritable.get()
+      case map: java.util.Map[_, _] =>
+        val mapWritable = new MapWritable()
+        map.foreach { case (k, v) =>
+          mapWritable.put(convertToWritable(k), convertToWritable(v))
+        }
+        mapWritable
+      case other => throw new SparkException(
+        s"Data of type ${other.getClass.getName} cannot be used")
+    }
+  }
+
+  override def convert(obj: Any): Writable = obj match {
+    case writable: Writable => writable
+    case other => convertToWritable(other)
+  }
+}
+
 /** Utilities for working with Python objects <-> Hadoop-related objects */
 private[python] object PythonHadoopUtil {
 
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
 
   /**
    * Converts an RDD of key-value pairs, where key and/or value could be instances of
-   * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+   * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
    */
   def convertRDD[K, V](rdd: RDD[(K, V)],
                        keyConverter: Converter[Any, Any],
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f551a59ee3fe8d1a6ddbe12ca59bacc835bd36a1..a9d758bf998c31b82e63258999b8375d885e3fce 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,15 +23,18 @@ import java.nio.charset.Charset
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
+import scala.language.existentials
 import scala.reflect.ClassTag
 import scala.util.Try
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.{InputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
 import org.apache.spark._
+import org.apache.spark.SparkContext._
 import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
@@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging {
       valueClassMaybeNull: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      minSplits: Int) = {
+      minSplits: Int,
+      batchSize: Int) = {
     val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
-    implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
-    val conf = PythonHadoopUtil.mapToConf(confAsMap)
-    val baseConf = sc.hadoopConfiguration()
-    val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
+    val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-    val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
-    val rdd = if (path.isDefined) {
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    if (path.isDefined) {
       sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
     } else {
       sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
     }
-    rdd
   }
 
   /**
@@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
-    val conf = PythonHadoopUtil.mapToConf(confAsMap)
-    val baseConf = sc.hadoopConfiguration()
-    val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
+    val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
@@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-    val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
-    val rdd = if (path.isDefined) {
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    if (path.isDefined) {
       sc.sc.hadoopFile(path.get, fc, kc, vc)
     } else {
       sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
     }
-    rdd
   }
 
   def writeUTF(str: String, dataOut: DataOutputStream) {
@@ -562,6 +555,152 @@ private[spark] object PythonRDD extends Logging {
     }
   }
 
+  private def getMergedConf(confAsMap: java.util.HashMap[String, String],
+      baseConf: Configuration): Configuration = {
+    val conf = PythonHadoopUtil.mapToConf(confAsMap)
+    PythonHadoopUtil.mergeConfs(baseConf, conf)
+  }
+
+  private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
+      valueConverterClass: String = null): (Class[_], Class[_]) = {
+    // Peek at an element to figure out key/value types. Since Writables are not serializable,
+    // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
+    // and then convert locally.
+    val (key, value) = rdd.first()
+    val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    (kc.convert(key).getClass, vc.convert(value).getClass)
+  }
+
+  private def getKeyValueTypes(keyClass: String, valueClass: String):
+      Option[(Class[_], Class[_])] = {
+    for {
+      k <- Option(keyClass)
+      v <- Option(valueClass)
+    } yield (Class.forName(k), Class.forName(v))
+  }
+
+  private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
+      defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
+    val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
+    val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+    (keyConverter, valueConverter)
+  }
+
+  /**
+   * Convert an RDD of key-value pairs from internal types to serializable types suitable for
+   * output, or vice versa.
+   */
+  private def convertRDD[K, V](rdd: RDD[(K, V)],
+                               keyConverterClass: String,
+                               valueConverterClass: String,
+                               defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+    val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+      defaultConverter)
+    PythonHadoopUtil.convertRDD(rdd, kc, vc)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the Writable types
+   * we convert from the RDD's key and value types. Note that keys and values can't be
+   * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
+   * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
+   */
+  def saveAsSequenceFile[K, V, C <: CompressionCodec](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      compressionCodecClass: String) = {
+    saveAsHadoopFile(
+      pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+      null, null, null, null, new java.util.HashMap(), compressionCodecClass)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop
+   * `OutputFormat` in mapred package. Keys and values are converted to suitable output
+   * types using either user specified converters or, if not specified,
+   * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+   * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+   * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+   * this RDD.
+   */
+  def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      outputFormatClass: String,
+      keyClass: String,
+      valueClass: String,
+      keyConverterClass: String,
+      valueConverterClass: String,
+      confAsMap: java.util.HashMap[String, String],
+      compressionCodecClass: String) = {
+    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+    val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+    val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop
+   * `OutputFormat` in mapreduce package. Keys and values are converted to suitable output
+   * types using either user specified converters or, if not specified,
+   * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+   * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+   * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+   * this RDD.
+   */
+  def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      outputFormatClass: String,
+      keyClass: String,
+      valueClass: String,
+      keyConverterClass: String,
+      valueConverterClass: String,
+      confAsMap: java.util.HashMap[String, String]) = {
+    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+    val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using a Hadoop conf
+   * converted from the passed-in `confAsMap`. The conf should set relevant output params (
+   * e.g., output path, output format, etc), in the same way as it would be configured for
+   * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are supported
+   * (mapred vs. mapreduce). Keys/values are converted for output using either user specified
+   * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
+   */
+  def saveAsHadoopDataset[K, V](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      confAsMap: java.util.HashMap[String, String],
+      keyConverterClass: String,
+      valueConverterClass: String,
+      useNewAPI: Boolean) = {
+    val conf = PythonHadoopUtil.mapToConf(confAsMap)
+    val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
+      keyConverterClass, valueConverterClass, new JavaToWritableConverter)
+    if (useNewAPI) {
+      converted.saveAsNewAPIHadoopDataset(conf)
+    } else {
+      converted.saveAsHadoopDataset(new JobConf(conf))
+    }
+  }
+
   /**
    * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
    * PySpark.
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9a012e72549012b900d5e5b3bc6f952515c909ff..efc9009c088a82a5f0468fcd2401737064a50bb7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.api.python
 
-import scala.util.Try
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
-import scala.util.Success
+import scala.collection.JavaConversions._
 import scala.util.Failure
-import net.razorvine.pickle.Pickler
+import scala.util.Try
 
+import net.razorvine.pickle.{Unpickler, Pickler}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, using Pickle. */
 private[python] object SerDeUtil extends Logging {
@@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging {
    * by PySpark. By default, if serialization fails, toString is called and the string
    * representation is serialized
    */
-  def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+  def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
     val (keyFailed, valueFailed) = checkPickle(rdd.first())
     rdd.mapPartitions { iter =>
       val pickle = new Pickler
-      iter.map { case (k, v) =>
-        if (keyFailed && valueFailed) {
-          pickle.dumps(Array(k.toString, v.toString))
-        } else if (keyFailed) {
-          pickle.dumps(Array(k.toString, v))
-        } else if (!keyFailed && valueFailed) {
-          pickle.dumps(Array(k, v.toString))
+      val cleaned = iter.map { case (k, v) =>
+        val key = if (keyFailed) k.toString else k
+        val value = if (valueFailed) v.toString else v
+        Array[Any](key, value)
+      }
+      if (batchSize > 1) {
+        cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+      } else {
+        cleaned.map(pickle.dumps(_))
+      }
+    }
+  }
+
+  /**
+   * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
+   */
+  def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+    def isPair(obj: Any): Boolean = {
+      Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+        obj.asInstanceOf[Array[_]].length == 2
+    }
+    pyRDD.mapPartitions { iter =>
+      val unpickle = new Unpickler
+      val unpickled =
+        if (batchSerialized) {
+          iter.flatMap { batch =>
+            unpickle.loads(batch) match {
+              case objs: java.util.List[_] => collectionAsScalaIterable(objs)
+              case other => throw new SparkException(
+                s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
+            }
+          }
         } else {
-          pickle.dumps(Array(k, v))
+          iter.map(unpickle.loads(_))
         }
+      unpickled.map {
+        case obj if isPair(obj) =>
+          // we only accept (K, V)
+          val arr = obj.asInstanceOf[Array[_]]
+          (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
+        case other => throw new SparkException(
+          s"RDD element of type ${other.getClass.getName} cannot be used")
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index f0e3fb9aff5a04d79837e0c2f357c3828f953a0c..d11db978b842e846a762d7641420496c58f6330d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.api.python
 
-import org.apache.spark.SparkContext
-import org.apache.hadoop.io._
-import scala.Array
 import java.io.{DataOutput, DataInput}
+import java.nio.charset.Charset
+
+import org.apache.hadoop.io._
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
 import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
- * A class to test MsgPack serialization on the Scala side, that will be deserialized
+ * A class to test Pyrolite serialization on the Scala side, that will be deserialized
  * in Python
  * @param str
  * @param int
@@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
   }
 }
 
-class TestConverter extends Converter[Any, Any] {
+private[python] class TestInputKeyConverter extends Converter[Any, Any] {
+  override def convert(obj: Any) = {
+    obj.asInstanceOf[IntWritable].get().toChar
+  }
+}
+
+private[python] class TestInputValueConverter extends Converter[Any, Any] {
   import collection.JavaConversions._
   override def convert(obj: Any) = {
     val m = obj.asInstanceOf[MapWritable]
@@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] {
   }
 }
 
+private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
+  override def convert(obj: Any) = {
+    new Text(obj.asInstanceOf[Int].toString)
+  }
+}
+
+private[python] class TestOutputValueConverter extends Converter[Any, Any] {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+    new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head)
+  }
+}
+
+private[python] class DoubleArrayWritable extends ArrayWritable(classOf[DoubleWritable])
+
+private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writable] {
+  override def convert(obj: Any) = obj match {
+    case arr if arr.getClass.isArray && arr.getClass.getComponentType == classOf[Double] =>
+      val daw = new DoubleArrayWritable
+      daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))
+      daw
+    case other => throw new SparkException(s"Data of type $other is not supported")
+  }
+}
+
+private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] {
+  override def convert(obj: Any): Array[Double] = obj match {
+    case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
+    case other => throw new SparkException(s"Data of type $other is not supported")
+  }
+}
+
 /**
  * This object contains method to generate SequenceFile test data and write it to a
  * given directory (probably a temp directory)
@@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator {
     sc.parallelize(intKeys).saveAsSequenceFile(intPath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
-    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
+    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+      ).saveAsSequenceFile(bytesPath)
     val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
     sc.parallelize(bools).saveAsSequenceFile(boolPath)
     sc.parallelize(intKeys).map{ case (k, v) =>
@@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator {
 
     // Create test data for ArrayWritable
     val data = Seq(
-      (1, Array(1.0, 2.0, 3.0)),
+      (1, Array()),
       (2, Array(3.0, 4.0, 5.0)),
       (3, Array(4.0, 5.0, 6.0))
     )
     sc.parallelize(data, numSlices = 2)
       .map{ case (k, v) =>
-      (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
-    }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
+        val va = new DoubleArrayWritable
+        va.set(v.map(new DoubleWritable(_)))
+        (new IntWritable(k), va)
+    }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, DoubleArrayWritable]](arrPath)
 
     // Create test data for MapWritable, with keys DoubleWritable and values Text
     val mapData = Seq(
-      (1, Map(2.0 -> "aa")),
-      (2, Map(3.0 -> "bb")),
+      (1, Map()),
       (2, Map(1.0 -> "cc")),
       (3, Map(2.0 -> "dd")),
       (2, Map(1.0 -> "aa")),
@@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator {
     )
     sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
       val mw = new MapWritable()
-      val k = m.keys.head
-      val v = m.values.head
-      mw.put(new DoubleWritable(k), new Text(v))
+      m.foreach { case (k, v) =>
+        mw.put(new DoubleWritable(k), new Text(v))
+      }
       (new IntWritable(i), mw)
     }.saveAsSequenceFile(mapPath)
 
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 90c69713019f2d0132c0d143606d30e1658d1d06..a88bf27add883a9faf5fdf07b086e8e954b3b3da 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -383,16 +383,16 @@ Apart from text files, Spark's Python API also supports several other data forma
 
 * `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
 
-* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below.
-
-### SequenceFile and Hadoop InputFormats
+* SequenceFile and Hadoop Input/Output Formats
 
 **Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach.
 
-#### Writable Support
+**Writable Support**
 
-PySpark SequenceFile support loads an RDD within Java, and pickles the resulting Java objects using
-[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables are automatically converted:
+PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the 
+resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile, 
+PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following 
+Writables are automatically converted:
 
 <table class="table">
 <tr><th>Writable Type</th><th>Python Type</th></tr>
@@ -403,32 +403,30 @@ PySpark SequenceFile support loads an RDD within Java, and pickles the resulting
 <tr><td>BooleanWritable</td><td>bool</td></tr>
 <tr><td>BytesWritable</td><td>bytearray</td></tr>
 <tr><td>NullWritable</td><td>None</td></tr>
-<tr><td>ArrayWritable</td><td>list of primitives, or tuple of objects</td></tr>
 <tr><td>MapWritable</td><td>dict</td></tr>
-<tr><td>Custom Class conforming to Java Bean conventions</td>
-    <td>dict of public properties (via JavaBean getters and setters) + __class__ for the class type</td></tr>
 </table>
 
-#### Loading SequenceFiles
+Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing, 
+users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default 
+converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get 
+Python `array.array` for arrays of primitive types, users need to specify custom converters.
+
+**Saving and Loading SequenceFiles**
 
-Similarly to text files, SequenceFiles can be loaded by specifying the path. The key and value
+Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value
 classes can be specified, but for standard Writables this is not required.
 
 {% highlight python %}
->>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
->>> rdd.collect()         # this example has DoubleWritable keys and Text values
-[(1.0, u'aa'),
- (2.0, u'bb'),
- (2.0, u'aa'),
- (3.0, u'cc'),
- (2.0, u'bb'),
- (1.0, u'aa')]
+>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+>>> rdd.saveAsSequenceFile("path/to/file")
+>>> sorted(sc.sequenceFile("path/to/file").collect())
+[(1, u'a'), (2, u'aa'), (3, u'aaa')]
 {% endhighlight %}
 
-#### Loading Other Hadoop InputFormats
+**Saving and Loading Other Hadoop Input/Output Formats**
 
-PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' Hadoop APIs. If required,
-a Hadoop configuration can be passed in as a Python dict. Here is an example using the
+PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs. 
+If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the
 Elasticsearch ESInputFormat:
 
 {% highlight python %}
@@ -447,8 +445,7 @@ Note that, if the InputFormat simply depends on a Hadoop configuration and/or in
 the key and value classes can easily be converted according to the above table,
 then this approach should work well for such cases.
 
-If you have custom serialized binary data (such as loading data from Cassandra / HBase) or custom
-classes that don't conform to the JavaBean requirements, then you will first need to 
+If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to 
 transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler.
 A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided 
 for this. Simply extend this trait and implement your transformation code in the ```convert``` 
@@ -456,11 +453,8 @@ method. Remember to ensure that this class, along with any dependencies required
 classpath.
 
 See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and 
-the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/pythonconverters) 
-for examples of using HBase and Cassandra ```InputFormat```.
-
-Future support for writing data out as ```SequenceFileOutputFormat``` and other ```OutputFormats```, 
-is forthcoming.
+the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters) 
+for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters.
 
 </div>
 
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
new file mode 100644
index 0000000000000000000000000000000000000000..1dfbf986044257d738a5fc9ef6844614675ba298
--- /dev/null
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create data in Cassandra fist
+(following: https://wiki.apache.org/cassandra/GettingStarted)
+
+cqlsh> CREATE KEYSPACE test
+   ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
+cqlsh> use test;
+cqlsh:test> CREATE TABLE users (
+        ...   user_id int PRIMARY KEY,
+        ...   fname text,
+        ...   lname text
+        ... );
+
+> cassandra_outputformat <host> test users 1745 john smith
+> cassandra_outputformat <host> test users 1744 john doe
+> cassandra_outputformat <host> test users 1746 john smith
+
+cqlsh:test> SELECT * FROM users;
+
+ user_id | fname | lname
+---------+-------+-------
+    1745 |  john | smith
+    1744 |  john |   doe
+    1746 |  john | smith
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 7:
+        print >> sys.stderr, """
+        Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py <args>
+        Assumes you have created the following table <cf> in Cassandra already,
+        running on <host>, in <keyspace>.
+
+        cqlsh:<keyspace>> CREATE TABLE <cf> (
+           ...   user_id int PRIMARY KEY,
+           ...   fname text,
+           ...   lname text
+           ... );
+        """
+        exit(-1)
+
+    host = sys.argv[1]
+    keyspace = sys.argv[2]
+    cf = sys.argv[3]
+    sc = SparkContext(appName="CassandraOutputFormat")
+
+    conf = {"cassandra.output.thrift.address":host,
+            "cassandra.output.thrift.port":"9160",
+            "cassandra.output.keyspace":keyspace,
+            "cassandra.output.partitioner.class":"Murmur3Partitioner",
+            "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
+            "mapreduce.output.basename":cf,
+            "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+            "mapreduce.job.output.key.class":"java.util.Map",
+            "mapreduce.job.output.value.class":"java.util.List"}
+    key = {"user_id" : int(sys.argv[4])}
+    sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
+        conf=conf,
+        keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
+        valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3289d9880a0f56043c638ff648685b79247d5c8b..c9fa8e171c2a118254e3f6e93db47a93f816f5af 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -65,7 +65,8 @@ if __name__ == "__main__":
         "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
         "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
         "org.apache.hadoop.hbase.client.Result",
-        valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
+        keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
+        valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
         conf=conf)
     output = hbase_rdd.collect()
     for (k, v) in output:
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
new file mode 100644
index 0000000000000000000000000000000000000000..5e11548fd13f774aae47c7b2b8bc81ae654f9934
--- /dev/null
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create test table in HBase first:
+
+hbase(main):001:0> create 'test', 'f1'
+0 row(s) in 0.7840 seconds
+
+> hbase_outputformat <host> test row1 f1 q1 value1
+> hbase_outputformat <host> test row2 f1 q1 value2
+> hbase_outputformat <host> test row3 f1 q1 value3
+> hbase_outputformat <host> test row4 f1 q1 value4
+
+hbase(main):002:0> scan 'test'
+ROW                   COLUMN+CELL
+ row1                 column=f1:q1, timestamp=1405659615726, value=value1
+ row2                 column=f1:q1, timestamp=1405659626803, value=value2
+ row3                 column=f1:q1, timestamp=1405659640106, value=value3
+ row4                 column=f1:q1, timestamp=1405659650292, value=value4
+4 row(s) in 0.0780 seconds
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 7:
+        print >> sys.stderr, """
+        Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py <args>
+        Assumes you have created <table> with column family <family> in HBase running on <host> already
+        """
+        exit(-1)
+
+    host = sys.argv[1]
+    table = sys.argv[2]
+    sc = SparkContext(appName="HBaseOutputFormat")
+
+    conf = {"hbase.zookeeper.quorum": host,
+            "hbase.mapred.outputtable": table,
+            "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
+            "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+            "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"}
+
+    sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
+        conf=conf,
+        keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
+        valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
index 29a65c7a5f295952eecb777bc811358ef796bbd1..83feb5703b908fdeacf0dcf79538fdcc5656a031 100644
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.pythonconverters
 import org.apache.spark.api.python.Converter
 import java.nio.ByteBuffer
 import org.apache.cassandra.utils.ByteBufferUtil
-import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
+import collection.JavaConversions._
 
 
 /**
@@ -44,3 +44,25 @@ class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, St
     mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
   }
 }
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * Map[String, Int] to Cassandra key
+ */
+class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] {
+  override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
+    val input = obj.asInstanceOf[java.util.Map[String, Int]]
+    mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * List[String] to Cassandra value
+ */
+class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] {
+  override def convert(obj: Any): java.util.List[ByteBuffer] = {
+    val input = obj.asInstanceOf[java.util.List[String]]
+    seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
+  }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
deleted file mode 100644
index 42ae960bd64a1aea03d99a05c6547f41ff8c9294..0000000000000000000000000000000000000000
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import org.apache.spark.api.python.Converter
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
- * to a String
- */
-class HBaseConverter extends Converter[Any, String] {
-  override def convert(obj: Any): String = {
-    val result = obj.asInstanceOf[Result]
-    Bytes.toStringBinary(result.value())
-  }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
new file mode 100644
index 0000000000000000000000000000000000000000..273bee0a8b30fd47a72555b3dd8c8ce431490c2a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.client.{Put, Result}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * HBase Result to a String
+ */
+class HBaseResultToStringConverter extends Converter[Any, String] {
+  override def convert(obj: Any): String = {
+    val result = obj.asInstanceOf[Result]
+    Bytes.toStringBinary(result.value())
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * ImmutableBytesWritable to a String
+ */
+class ImmutableBytesWritableToStringConverter extends Converter[Any, String] {
+  override def convert(obj: Any): String = {
+    val key = obj.asInstanceOf[ImmutableBytesWritable]
+    Bytes.toStringBinary(key.get())
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * String to an ImmutableBytesWritable
+ */
+class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] {
+  override def convert(obj: Any): ImmutableBytesWritable = {
+    val bytes = Bytes.toBytes(obj.asInstanceOf[String])
+    new ImmutableBytesWritable(bytes)
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * list of Strings to HBase Put
+ */
+class StringListToPutConverter extends Converter[Any, Put] {
+  override def convert(obj: Any): Put = {
+    val output = obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray
+    val put = new Put(output(0))
+    put.add(output(1), output(2), output(3))
+  }
+}
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 830a6ee03f2a6ec8a4866fb5c16af575c6886297..7b0f8d83aedc5ab487d87dc9b6615d1d4e299505 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -60,6 +60,7 @@ class SparkContext(object):
     _active_spark_context = None
     _lock = Lock()
     _python_includes = None  # zip and egg files that need to be added to PYTHONPATH
+    _default_batch_size_for_serialized_input = 10
 
     def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
                  environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
@@ -378,7 +379,7 @@ class SparkContext(object):
         return jm
 
     def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
-                     valueConverter=None, minSplits=None):
+                     valueConverter=None, minSplits=None, batchSize=None):
         """
         Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -398,14 +399,18 @@ class SparkContext(object):
         @param valueConverter:
         @param minSplits: minimum splits in dataset
                (default min(2, sc.defaultParallelism))
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         minSplits = minSplits or min(self.defaultParallelism, 2)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
-                                                keyConverter, valueConverter, minSplits)
-        return RDD(jrdd, self, PickleSerializer())
+                    keyConverter, valueConverter, minSplits, batchSize)
+        return RDD(jrdd, self, ser)
 
     def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                         valueConverter=None, conf=None):
+                         valueConverter=None, conf=None, batchSize=None):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -425,14 +430,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
-                                                    valueClass, keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                        valueConverter=None, conf=None):
+                        valueConverter=None, conf=None, batchSize=None):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -449,14 +458,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
-                                                   valueClass, keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                   valueConverter=None, conf=None):
+                   valueConverter=None, conf=None, batchSize=None):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -476,14 +489,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
-                                              valueClass, keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                  valueConverter=None, conf=None):
+                  valueConverter=None, conf=None, batchSize=None):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -500,11 +517,15 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
-                                             keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def _checkpointFile(self, name, input_deserializer):
         jrdd = self._jsc.checkpointFile(name)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b84d976114f0dc5fc5d8c8a0de67049156bfbd2b..e8fcc900efb2479f88f1fffec18a5fe0218643c3 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -231,6 +231,13 @@ class RDD(object):
         self._jrdd_deserializer = jrdd_deserializer
         self._id = jrdd.id()
 
+    def _toPickleSerialization(self):
+        if (self._jrdd_deserializer == PickleSerializer() or
+            self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+            return self
+        else:
+            return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+
     def id(self):
         """
         A unique ID for this RDD (within its SparkContext).
@@ -1030,6 +1037,113 @@ class RDD(object):
         """
         return self.take(1)[0]
 
+    def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
+        converted for output using either user specified converters or, by default,
+        L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+        @param conf: Hadoop job configuration, passed in as a dict
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+                                                    keyConverter, valueConverter, True)
+
+    def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+                               keyConverter=None, valueConverter=None, conf=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
+        will be inferred if not specified. Keys and values are converted for output using either
+        user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+        C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+        of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+        @param path: path to Hadoop file
+        @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+               (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+        @param keyClass: fully qualified classname of key Writable class
+               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+        @param valueClass: fully qualified classname of value Writable class
+               (e.g. "org.apache.hadoop.io.Text", None by default)
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        @param conf: Hadoop job configuration, passed in as a dict (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+            outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+
+    def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
+        converted for output using either user specified converters or, by default,
+        L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+        @param conf: Hadoop job configuration, passed in as a dict
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+                                                    keyConverter, valueConverter, False)
+
+    def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+                         keyConverter=None, valueConverter=None, conf=None,
+                         compressionCodecClass=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the old Hadoop OutputFormat API (mapred package). Key and value types
+        will be inferred if not specified. Keys and values are converted for output using either
+        user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+        C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+        of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+        @param path: path to Hadoop file
+        @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+               (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+        @param keyClass: fully qualified classname of key Writable class
+               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+        @param valueClass: fully qualified classname of value Writable class
+               (e.g. "org.apache.hadoop.io.Text", None by default)
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        @param conf: (None by default)
+        @param compressionCodecClass: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+            outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
+            jconf, compressionCodecClass)
+
+    def saveAsSequenceFile(self, path, compressionCodecClass=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
+        RDD's key and value types. The mechanism is as follows:
+            1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
+            2. Keys and values of this Java RDD are converted to Writables and written out.
+
+        @param path: path to sequence file
+        @param compressionCodecClass: (None by default)
+        """
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+                                                   path, compressionCodecClass)
+
     def saveAsPickleFile(self, path, batchSize=10):
         """
         Save this RDD as a SequenceFile of serialized objects. The serializer
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8486c8595b5a4283d2ee02ca62a960b6088fc731..c29deb9574ea28650af70902a74d798d3921581a 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,6 +19,7 @@
 Unit tests for PySpark; additional tests are implemented as doctests in
 individual modules.
 """
+from array import array
 from fileinput import input
 from glob import glob
 import os
@@ -327,6 +328,17 @@ class TestInputFormat(PySparkTestCase):
         ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
         self.assertEqual(doubles, ed)
 
+        bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/",
+                                              "org.apache.hadoop.io.IntWritable",
+                                              "org.apache.hadoop.io.BytesWritable").collect())
+        ebs = [(1, bytearray('aa', 'utf-8')),
+               (1, bytearray('aa', 'utf-8')),
+               (2, bytearray('aa', 'utf-8')),
+               (2, bytearray('bb', 'utf-8')),
+               (2, bytearray('bb', 'utf-8')),
+               (3, bytearray('cc', 'utf-8'))]
+        self.assertEqual(bytes, ebs)
+
         text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
                                            "org.apache.hadoop.io.Text",
                                            "org.apache.hadoop.io.Text").collect())
@@ -353,14 +365,34 @@ class TestInputFormat(PySparkTestCase):
         maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
                                            "org.apache.hadoop.io.IntWritable",
                                            "org.apache.hadoop.io.MapWritable").collect())
-        em = [(1, {2.0: u'aa'}),
+        em = [(1, {}),
               (1, {3.0: u'bb'}),
               (2, {1.0: u'aa'}),
               (2, {1.0: u'cc'}),
-              (2, {3.0: u'bb'}),
               (3, {2.0: u'dd'})]
         self.assertEqual(maps, em)
 
+        # arrays get pickled to tuples by default
+        tuples = sorted(self.sc.sequenceFile(
+            basepath + "/sftestdata/sfarray/",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable").collect())
+        et = [(1, ()),
+              (2, (3.0, 4.0, 5.0)),
+              (3, (4.0, 5.0, 6.0))]
+        self.assertEqual(tuples, et)
+
+        # with custom converters, primitive arrays can stay as arrays
+        arrays = sorted(self.sc.sequenceFile(
+            basepath + "/sftestdata/sfarray/",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+        ea = [(1, array('d')),
+              (2, array('d', [3.0, 4.0, 5.0])),
+              (3, array('d', [4.0, 5.0, 6.0]))]
+        self.assertEqual(arrays, ea)
+
         clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
                                             "org.apache.hadoop.io.Text",
                                             "org.apache.spark.api.python.TestWritable").collect())
@@ -369,6 +401,12 @@ class TestInputFormat(PySparkTestCase):
                u'double': 54.0, u'int': 123, u'str': u'test1'})
         self.assertEqual(clazz[0], ec)
 
+        unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
+                                            "org.apache.hadoop.io.Text",
+                                            "org.apache.spark.api.python.TestWritable",
+                                            batchSize=1).collect())
+        self.assertEqual(unbatched_clazz[0], ec)
+
     def test_oldhadoop(self):
         basepath = self.tempdir.name
         ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
@@ -379,10 +417,11 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(ints, ei)
 
         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
-        hello = self.sc.hadoopFile(hellopath,
-                                   "org.apache.hadoop.mapred.TextInputFormat",
-                                   "org.apache.hadoop.io.LongWritable",
-                                   "org.apache.hadoop.io.Text").collect()
+        oldconf = {"mapred.input.dir" : hellopath}
+        hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat",
+                                  "org.apache.hadoop.io.LongWritable",
+                                  "org.apache.hadoop.io.Text",
+                                  conf=oldconf).collect()
         result = [(0, u'Hello World!')]
         self.assertEqual(hello, result)
 
@@ -397,10 +436,11 @@ class TestInputFormat(PySparkTestCase):
         self.assertEqual(ints, ei)
 
         hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
-        hello = self.sc.newAPIHadoopFile(hellopath,
-                                         "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
-                                         "org.apache.hadoop.io.LongWritable",
-                                         "org.apache.hadoop.io.Text").collect()
+        newconf = {"mapred.input.dir" : hellopath}
+        hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
+                                        "org.apache.hadoop.io.LongWritable",
+                                        "org.apache.hadoop.io.Text",
+                                        conf=newconf).collect()
         result = [(0, u'Hello World!')]
         self.assertEqual(hello, result)
 
@@ -435,16 +475,267 @@ class TestInputFormat(PySparkTestCase):
             "org.apache.hadoop.io.IntWritable",
             "org.apache.hadoop.io.Text"))
 
-    def test_converter(self):
+    def test_converters(self):
+        # use of custom converters
         basepath = self.tempdir.name
         maps = sorted(self.sc.sequenceFile(
             basepath + "/sftestdata/sfmap/",
             "org.apache.hadoop.io.IntWritable",
             "org.apache.hadoop.io.MapWritable",
-            valueConverter="org.apache.spark.api.python.TestConverter").collect())
-        em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])]
+            keyConverter="org.apache.spark.api.python.TestInputKeyConverter",
+            valueConverter="org.apache.spark.api.python.TestInputValueConverter").collect())
+        em = [(u'\x01', []),
+              (u'\x01', [3.0]),
+              (u'\x02', [1.0]),
+              (u'\x02', [1.0]),
+              (u'\x03', [2.0])]
+        self.assertEqual(maps, em)
+
+class TestOutputFormat(PySparkTestCase):
+
+    def setUp(self):
+        PySparkTestCase.setUp(self)
+        self.tempdir = tempfile.NamedTemporaryFile(delete=False)
+        os.unlink(self.tempdir.name)
+
+    def tearDown(self):
+        PySparkTestCase.tearDown(self)
+        shutil.rmtree(self.tempdir.name, ignore_errors=True)
+
+    def test_sequencefiles(self):
+        basepath = self.tempdir.name
+        ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+        self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/")
+        ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect())
+        self.assertEqual(ints, ei)
+
+        ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
+        self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
+        doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
+        self.assertEqual(doubles, ed)
+
+        ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
+        self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/")
+        bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect())
+        self.assertEqual(bytes, ebs)
+
+        et = [(u'1', u'aa'),
+              (u'2', u'bb'),
+              (u'3', u'cc')]
+        self.sc.parallelize(et).saveAsSequenceFile(basepath + "/sftext/")
+        text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect())
+        self.assertEqual(text, et)
+
+        eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
+        self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/")
+        bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect())
+        self.assertEqual(bools, eb)
+
+        en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
+        self.sc.parallelize(en).saveAsSequenceFile(basepath + "/sfnull/")
+        nulls = sorted(self.sc.sequenceFile(basepath + "/sfnull/").collect())
+        self.assertEqual(nulls, en)
+
+        em = [(1, {}),
+              (1, {3.0: u'bb'}),
+              (2, {1.0: u'aa'}),
+              (2, {1.0: u'cc'}),
+              (3, {2.0: u'dd'})]
+        self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
+        maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect())
         self.assertEqual(maps, em)
 
+    def test_oldhadoop(self):
+        basepath = self.tempdir.name
+        dict_data = [(1, {}),
+                     (1, {"row1" : 1.0}),
+                     (2, {"row2" : 2.0})]
+        self.sc.parallelize(dict_data).saveAsHadoopFile(
+            basepath + "/oldhadoop/",
+            "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.MapWritable")
+        result = sorted(self.sc.hadoopFile(
+            basepath + "/oldhadoop/",
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.MapWritable").collect())
+        self.assertEqual(result, dict_data)
+
+        conf = {
+            "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
+            "mapred.output.dir" : basepath + "/olddataset/"}
+        self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
+        input_conf = {"mapred.input.dir" : basepath + "/olddataset/"}
+        old_dataset = sorted(self.sc.hadoopRDD(
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.MapWritable",
+            conf=input_conf).collect())
+        self.assertEqual(old_dataset, dict_data)
+
+    def test_newhadoop(self):
+        basepath = self.tempdir.name
+        # use custom ArrayWritable types and converters to handle arrays
+        array_data = [(1, array('d')),
+                      (1, array('d', [1.0, 2.0, 3.0])),
+                      (2, array('d', [3.0, 4.0, 5.0]))]
+        self.sc.parallelize(array_data).saveAsNewAPIHadoopFile(
+            basepath + "/newhadoop/",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+        result = sorted(self.sc.newAPIHadoopFile(
+            basepath + "/newhadoop/",
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
+        self.assertEqual(result, array_data)
+
+        conf = {"mapreduce.outputformat.class" :
+                     "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+                 "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+                 "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
+                 "mapred.output.dir" : basepath + "/newdataset/"}
+        self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
+            valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
+        input_conf = {"mapred.input.dir" : basepath + "/newdataset/"}
+        new_dataset = sorted(self.sc.newAPIHadoopRDD(
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.spark.api.python.DoubleArrayWritable",
+            valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter",
+            conf=input_conf).collect())
+        self.assertEqual(new_dataset, array_data)
+
+    def test_newolderror(self):
+        basepath = self.tempdir.name
+        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+        self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+            basepath + "/newolderror/saveAsHadoopFile/",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"))
+        self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+            basepath + "/newolderror/saveAsNewAPIHadoopFile/",
+            "org.apache.hadoop.mapred.SequenceFileOutputFormat"))
+
+    def test_bad_inputs(self):
+        basepath = self.tempdir.name
+        rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+        self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile(
+            basepath + "/badinputs/saveAsHadoopFile/",
+            "org.apache.hadoop.mapred.NotValidOutputFormat"))
+        self.assertRaises(Exception, lambda: rdd.saveAsNewAPIHadoopFile(
+            basepath + "/badinputs/saveAsNewAPIHadoopFile/",
+            "org.apache.hadoop.mapreduce.lib.output.NotValidOutputFormat"))
+
+    def test_converters(self):
+        # use of custom converters
+        basepath = self.tempdir.name
+        data = [(1, {3.0: u'bb'}),
+                (2, {1.0: u'aa'}),
+                (3, {2.0: u'dd'})]
+        self.sc.parallelize(data).saveAsNewAPIHadoopFile(
+            basepath + "/converters/",
+            "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            keyConverter="org.apache.spark.api.python.TestOutputKeyConverter",
+            valueConverter="org.apache.spark.api.python.TestOutputValueConverter")
+        converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect())
+        expected = [(u'1', 3.0),
+                    (u'2', 1.0),
+                    (u'3', 2.0)]
+        self.assertEqual(converted, expected)
+
+    def test_reserialization(self):
+        basepath = self.tempdir.name
+        x = range(1, 5)
+        y = range(1001, 1005)
+        data = zip(x, y)
+        rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
+        rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
+        result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
+        self.assertEqual(result1, data)
+
+        rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop",
+                             "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+        result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect())
+        self.assertEqual(result2, data)
+
+        rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop",
+                             "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+        result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect())
+        self.assertEqual(result3, data)
+
+        conf4 = {
+            "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.dir" : basepath + "/reserialize/dataset"}
+        rdd.saveAsHadoopDataset(conf4)
+        result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect())
+        self.assertEqual(result4, data)
+
+        conf5 = {"mapreduce.outputformat.class" :
+                     "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
+            "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable",
+            "mapred.output.dir" : basepath + "/reserialize/newdataset"}
+        rdd.saveAsNewAPIHadoopDataset(conf5)
+        result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect())
+        self.assertEqual(result5, data)
+
+    def test_unbatched_save_and_read(self):
+        basepath = self.tempdir.name
+        ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
+        self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
+            basepath + "/unbatched/")
+
+        unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
+            batchSize=1).collect())
+        self.assertEqual(unbatched_sequence, ei)
+
+        unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/",
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            batchSize=1).collect())
+        self.assertEqual(unbatched_hadoopFile, ei)
+
+        unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/",
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            batchSize=1).collect())
+        self.assertEqual(unbatched_newAPIHadoopFile, ei)
+
+        oldconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+        unbatched_hadoopRDD = sorted(self.sc.hadoopRDD(
+            "org.apache.hadoop.mapred.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            conf=oldconf,
+            batchSize=1).collect())
+        self.assertEqual(unbatched_hadoopRDD, ei)
+
+        newconf = {"mapred.input.dir" : basepath + "/unbatched/"}
+        unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD(
+            "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
+            "org.apache.hadoop.io.IntWritable",
+            "org.apache.hadoop.io.Text",
+            conf=newconf,
+            batchSize=1).collect())
+        self.assertEqual(unbatched_newAPIHadoopRDD, ei)
+
+    def test_malformed_RDD(self):
+        basepath = self.tempdir.name
+        # non-batch-serialized RDD[[(K, V)]] should be rejected
+        data = [[(1, "a")], [(2, "aa")], [(3, "aaa")]]
+        rdd = self.sc.parallelize(data, numSlices=len(data))
+        self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile(
+            basepath + "/malformed/sequence"))
 
 class TestDaemon(unittest.TestCase):
     def connect(self, port):