diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 2ba4fb8c18b1dafeb322fa3df5f23c3b859a2270..05b89b985736d3169e83265cd44caf2dd47aae9f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -478,6 +478,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
     new java.util.ArrayList(arr)
   }
 
+  /**
+   * Returns the maximum element from this RDD as defined by the specified
+   * Comparator[T].
+   * @params comp the comparator that defines ordering
+   * @return the maximum of the RDD
+   * */
+  def max(comp: Comparator[T]): T = {
+    rdd.max()(Ordering.comparatorToOrdering(comp))
+  }
+
+  /**
+   * Returns the minimum element from this RDD as defined by the specified
+   * Comparator[T].
+   * @params comp the comparator that defines ordering
+   * @return the minimum of the RDD
+   * */
+  def min(comp: Comparator[T]): T = {
+    rdd.min()(Ordering.comparatorToOrdering(comp))
+  }
+
   /**
    * Returns the first K elements from this RDD using the
    * natural ordering for T while maintain the order.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f8283fbbb980d09e97cd9ea7d83ec912c04f4293..ddb901246d3609935442ebb69bc75e6c1f7749b2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -956,6 +956,18 @@ abstract class RDD[T: ClassTag](
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
 
+  /**
+   * Returns the max of this RDD as defined by the implicit Ordering[T].
+   * @return the maximum element of the RDD
+   * */
+  def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)
+
+  /**
+   * Returns the min of this RDD as defined by the implicit Ordering[T].
+   * @return the minimum element of the RDD
+   * */
+  def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)
+
   /**
    * Save this RDD as a text file, using string representations of elements.
    */
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index f837dc7ccc8601e19d510a8c36a6e87bf26f6180..732748a7ff82bffd62fac6679529a210646e4482 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
   private var n: Long = 0     // Running count of our values
   private var mu: Double = 0  // Running mean of our values
   private var m2: Double = 0  // Running variance numerator (sum of (x - mean)^2)
+  private var maxValue: Double = Double.NegativeInfinity // Running max of our values
+  private var minValue: Double = Double.PositiveInfinity // Running min of our values
 
   merge(values)
 
@@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
     n += 1
     mu += delta / n
     m2 += delta * (value - mu)
+    maxValue = math.max(maxValue, value)
+    minValue = math.min(minValue, value)
     this
   }
 
@@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
       if (n == 0) {
         mu = other.mu
         m2 = other.m2
-        n = other.n       
+        n = other.n  
+        maxValue = other.maxValue
+        minValue = other.minValue
       } else if (other.n != 0) {        
         val delta = other.mu - mu
         if (other.n * 10 < n) {
@@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
         }
         m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
         n += other.n
+        maxValue = math.max(maxValue, other.maxValue)
+        minValue = math.min(minValue, other.minValue)
       }
       this
     }
@@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
     other.n = n
     other.mu = mu
     other.m2 = m2
+    other.maxValue = maxValue
+    other.minValue = minValue
     other
   }
 
@@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
 
   def sum: Double = n * mu
 
+  def max: Double = maxValue
+
+  def min: Double = minValue
+
   /** Return the variance of the values. */
   def variance: Double = {
     if (n == 0) {
@@ -121,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
   def sampleStdev: Double = math.sqrt(sampleVariance)
 
   override def toString: String = {
-    "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
+    "(count: %d, mean: %f, stdev: %f, max: %f, min: %f)".format(count, mean, stdev, max, min)
   }
 }
 
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 4305686d3a6d5f564d55c0a452ff4d4a5c7c286d..996db70809320f025eed6fab2dc15578701ca43f 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
     assert(abs(6.0/2 - rdd.mean) < 0.01)
     assert(abs(1.0 - rdd.variance) < 0.01)
     assert(abs(1.0 - rdd.stdev) < 0.01)
+    assert(stats.max === 4.0)
+    assert(stats.min === 2.0)
 
     // Add other tests here for classes that should be able to handle empty partitions correctly
   }
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 9512e0e6eeb14f48ee6cc8d3194b74dbfa0fef10..d6b5fdc7984b43eb3be8ca1ac602f083ae9f34e5 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -47,6 +47,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
     assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
     assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
+    assert(nums.max() === 4)
+    assert(nums.min() === 1)
     val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
     assert(partitionSums.collect().toList === List(3, 7))
 
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f3b432ff248a96b9d5128ca64ce5c1e822b11d37..ae09dbff02a36c0d1932a673cd2c2117a574fda1 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -571,7 +571,26 @@ class RDD(object):
         return reduce(op, vals, zeroValue)
 
     # TODO: aggregate
+        
+
+    def max(self):
+        """
+        Find the maximum item in this RDD.
+
+        >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
+        43.0
+        """
+        return self.reduce(max)
 
+    def min(self):
+        """
+        Find the maximum item in this RDD.
+
+        >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
+        1.0
+        """
+        return self.reduce(min)
+    
     def sum(self):
         """
         Add up the elements in this RDD.
diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py
index 8e1cbd4ad9856bacbfc529aa55f7c34373eae758..080325061a697d9861fb5bdcb9dda2c2640da3a4 100644
--- a/python/pyspark/statcounter.py
+++ b/python/pyspark/statcounter.py
@@ -26,7 +26,9 @@ class StatCounter(object):
         self.n = 0L    # Running count of our values
         self.mu = 0.0  # Running mean of our values
         self.m2 = 0.0  # Running variance numerator (sum of (x - mean)^2)
-
+        self.maxValue = float("-inf")
+        self.minValue = float("inf")
+        
         for v in values:
             self.merge(v)
             
@@ -36,6 +38,11 @@ class StatCounter(object):
         self.n += 1
         self.mu += delta / self.n
         self.m2 += delta * (value - self.mu)
+        if self.maxValue < value:
+            self.maxValue = value
+        if self.minValue > value:
+            self.minValue = value
+            
         return self
 
     # Merge another StatCounter into this one, adding up the internal statistics.
@@ -49,7 +56,10 @@ class StatCounter(object):
             if self.n == 0:
                 self.mu = other.mu
                 self.m2 = other.m2
-                self.n = other.n       
+                self.n = other.n
+                self.maxValue = other.maxValue
+                self.minValue = other.minValue
+                
             elif other.n != 0:        
                 delta = other.mu - self.mu
                 if other.n * 10 < self.n:
@@ -58,6 +68,9 @@ class StatCounter(object):
                     self.mu = other.mu - (delta * self.n) / (self.n + other.n)
                 else:
                     self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)
+                
+                    self.maxValue = max(self.maxValue, other.maxValue)
+                    self.minValue = min(self.minValue, other.minValue)
         
                 self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
                 self.n += other.n
@@ -76,6 +89,12 @@ class StatCounter(object):
     def sum(self):
         return self.n * self.mu
 
+    def min(self):
+        return self.minValue
+
+    def max(self):
+        return self.maxValue
+    
     # Return the variance of the values.
     def variance(self):
         if self.n == 0:
@@ -105,5 +124,5 @@ class StatCounter(object):
         return math.sqrt(self.sampleVariance())
 
     def __repr__(self):
-        return "(count: %s, mean: %s, stdev: %s)" % (self.count(), self.mean(), self.stdev())
+        return "(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % (self.count(), self.mean(), self.stdev(), self.max(), self.min())