diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 747023812f7543539978b4c0ffa1a5aeeb580301..ae8010300a500d08e76bb8b92418c2166b6c3d05 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging {
       }
     }
   }
+
+  /**
+    * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+    */
+  def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
+    pyRDD.rdd.mapPartitions { iter =>
+      val unpickle = new Unpickler
+      iter.flatMap { row =>
+        val obj = unpickle.loads(row)
+        if (batched) {
+          obj.asInstanceOf[JArrayList[_]]
+        } else {
+          Seq(obj)
+        }
+      }
+    }.toJavaRDD()
+  }
 }
 
 private
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index bdd8bc82869fb371b7d9aef8cb9885b7e9100063..9f88340d0377828aad29a4cfc30b2b7b205bc1ae 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -131,6 +131,22 @@ class _JavaStackTrace(object):
             self._context._jsc.setCallSite(None)
 
 
+class BoundedFloat(float):
+    """
+    Bounded value is generated by approximate job, with confidence and low
+    bound and high bound.
+
+    >>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
+    100.0
+    """
+    def __new__(cls, mean, confidence, low, high):
+        obj = float.__new__(cls, mean)
+        obj.confidence = confidence
+        obj.low = low
+        obj.high = high
+        return obj
+
+
 class MaxHeapQ(object):
 
     """
@@ -1792,6 +1808,71 @@ class RDD(object):
     # keys in the pairs.  This could be an expensive operation, since those
     # hashes aren't retained.
 
+    def _is_pickled(self):
+        """ Return this RDD is serialized by Pickle or not. """
+        der = self._jrdd_deserializer
+        if isinstance(der, PickleSerializer):
+            return True
+        if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+            return True
+        return False
+
+    def _to_jrdd(self):
+        """ Return an JavaRDD of Object by unpickling
+
+        It will convert each Python object into Java object by Pyrolite, whenever the
+        RDD is serialized in batch or not.
+        """
+        if not self._is_pickled():
+            self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+        batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+        return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+    def countApprox(self, timeout, confidence=0.95):
+        """
+        :: Experimental ::
+        Approximate version of count() that returns a potentially incomplete
+        result within a timeout, even if not all tasks have finished.
+
+        >>> rdd = sc.parallelize(range(1000), 10)
+        >>> rdd.countApprox(1000, 1.0)
+        1000
+        """
+        drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
+        return int(drdd.sumApprox(timeout, confidence))
+
+    def sumApprox(self, timeout, confidence=0.95):
+        """
+        :: Experimental ::
+        Approximate operation to return the sum within a timeout
+        or meet the confidence.
+
+        >>> rdd = sc.parallelize(range(1000), 10)
+        >>> r = sum(xrange(1000))
+        >>> (rdd.sumApprox(1000) - r) / r < 0.05
+        True
+        """
+        jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
+        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+        r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
+        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
+    def meanApprox(self, timeout, confidence=0.95):
+        """
+        :: Experimental ::
+        Approximate operation to return the mean within a timeout
+        or meet the confidence.
+
+        >>> rdd = sc.parallelize(range(1000), 10)
+        >>> r = sum(xrange(1000)) / 1000.0
+        >>> (rdd.meanApprox(1000) - r) / r < 0.05
+        True
+        """
+        jrdd = self.map(float)._to_jrdd()
+        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+        r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
+        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
 
 class PipelinedRDD(RDD):