diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index f9b6ee351a151cefcc310d17d50249fdc20b1d5b..043cb183bad179d864d92f11e68c397fc284bd29 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -93,6 +93,17 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
   def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
     fromRDD(srdd.coalesce(numPartitions, shuffle))
 
+  /**
+   * Return a new RDD that has exactly numPartitions partitions.
+   *
+   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+   * a shuffle to redistribute data.
+   *
+   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+   * which can avoid performing a shuffle.
+   */
+  def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions))
+
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
    * 
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 268f43b4e865b28b5df72104c65905b087f020c4..39f408b8c8c7eb8939258125f80b6b60230dc7df 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -107,6 +107,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
   def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
     fromRDD(rdd.coalesce(numPartitions, shuffle))
 
+  /**
+   * Return a new RDD that has exactly numPartitions partitions.
+   *
+   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+   * a shuffle to redistribute data.
+   *
+   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+   * which can avoid performing a shuffle.
+   */
+  def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions))
+
   /**
    * Return a sampled subset of this RDD.
    */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 662990049b0938f542e5966c992a2bd1554a5586..3b359a8fd60941b8cbcdcceb843d1d52c8887842 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -81,6 +81,17 @@ JavaRDDLike[T, JavaRDD[T]] {
   def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
     rdd.coalesce(numPartitions, shuffle)
 
+  /**
+   * Return a new RDD that has exactly numPartitions partitions.
+   *
+   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+   * a shuffle to redistribute data.
+   *
+   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+   * which can avoid performing a shuffle.
+   */
+  def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions)
+
   /**
    * Return a sampled subset of this RDD.
    */
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 17bc2515f2f433b7493fe6ed81d2daa40b8c49cc..6e88be6f6ac64bb3d99bb56aa276c789dfba7c6a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -268,7 +268,7 @@ abstract class RDD[T: ClassManifest](
   /**
    * Return a new RDD that has exactly numPartitions partitions.
    *
-   * Used to increase or decrease the level of parallelism in this RDD. This will use
+   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
    * a shuffle to redistribute data.
    *
    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 7b0bb89ab28ce0306e852da05263b4a94b03b9f1..f38c607d653e5bd3a01ef8374af6352e4b88eca0 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -472,6 +472,27 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
   }
 
+  @Test
+  public void repartition() {
+    // Shrinking number of partitions
+    JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2);
+    JavaRDD<Integer> repartitioned1 = in1.repartition(4);
+    List<List<Integer>> result1 = repartitioned1.glom().collect();
+    Assert.assertEquals(4, result1.size());
+    for (List<Integer> l: result1) {
+      Assert.assertTrue(l.size() > 0);
+    }
+
+    // Growing number of partitions
+    JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4);
+    JavaRDD<Integer> repartitioned2 = in2.repartition(2);
+    List<List<Integer>> result2 = repartitioned2.glom().collect();
+    Assert.assertEquals(2, result2.size());
+    for (List<Integer> l: result2) {
+      Assert.assertTrue(l.size() > 0);
+    }
+  }
+
   @Test
   public void persist() {
     JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index d1932b6b05a093fbf814f274dc6dc488bca73053..1a2aeaa8797e1fcbabe04505fdcd0c78e98612a3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -94,6 +94,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
    */
   def union(that: JavaDStream[T]): JavaDStream[T] =
     dstream.union(that.dstream)
+
+  /**
+   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
+   * returned DStream has exactly numPartitions partitions.
+   */
+  def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions)
 }
 
 object JavaDStream {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 978fca33ad03e344a355c72dd3f3eff0658acc35..faf8f361826f798d3971da89af45fb4c7ba3173a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -59,6 +59,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /** Persist the RDDs of this DStream with the given storage level */
   def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
 
+  /**
+   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
+   * returned DStream has exactly numPartitions partitions.
+   */
+  def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions)
+
   /** Method that generates a RDD for the given Duration */
   def compute(validTime: Time): JavaPairRDD[K, V] = {
     dstream.compute(validTime) match {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index dc01f1e8aa0ca3ee0935378462c2fbe85bd20c85..5a9836a415d1fd87220d889261c0a5838846c3a5 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -183,6 +183,39 @@ public class JavaAPISuite implements Serializable {
     assertOrderInvariantEquals(expected, result);
   }
 
+  @Test
+  public void testRepartitionMorePartitions() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1,2,3,4,5,6,7,8,9,10),
+      Arrays.asList(1,2,3,4,5,6,7,8,9,10));
+    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+    JavaDStream repartitioned = stream.repartition(4);
+    JavaTestUtils.attachTestOutputStream(repartitioned);
+    List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
+    Assert.assertEquals(2, result.size());
+    for ( List<List<Integer>> rdd : result) {
+      Assert.assertEquals(4, rdd.size());
+      Assert.assertEquals(
+        10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size());
+    }
+  }
+
+  @Test
+  public void testRepartitionFewerPartitions() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1,2,3,4,5,6,7,8,9,10),
+      Arrays.asList(1,2,3,4,5,6,7,8,9,10));
+    JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+    JavaDStream repartitioned = stream.repartition(2);
+    JavaTestUtils.attachTestOutputStream(repartitioned);
+    List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
+    Assert.assertEquals(2, result.size());
+    for ( List<List<Integer>> rdd : result) {
+      Assert.assertEquals(2, rdd.size());
+      Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size());
+    }
+  }
+
   @Test
   public void testGlom() {
     List<List<String>> inputData = Arrays.asList(
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 5344ae7682fd6f5808750c8b846e12c109f679e5..780f7b823b1018dc1db5e5667b3439496a6e34a3 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -62,6 +62,8 @@ trait JavaTestBase extends TestSuiteBase {
    * Process all registered streams for a numBatches batches, failing if
    * numExpectedOutput RDD's are not generated. Generated RDD's are collected
    * and returned, represented as a list for each batch interval.
+   *
+   * Returns a list of items for each RDD.
    */
   def runStreams[V](
     ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
@@ -72,6 +74,27 @@ trait JavaTestBase extends TestSuiteBase {
     res.map(entry => out.append(new ArrayList[V](entry)))
     out
   }
+
+  /**
+   * Process all registered streams for a numBatches batches, failing if
+   * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+   * and returned, represented as a list for each batch interval.
+   *
+   * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
+   * representing one partition.
+   */
+  def runStreamsWithPartitions[V](
+    ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = {
+    implicit val cm: ClassManifest[V] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+    val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
+    val out = new ArrayList[JList[JList[V]]]()
+    res.map(entry => {
+      val lists = entry.map(new ArrayList[V](_))
+      out.append(new ArrayList[JList[V]](lists))
+    })
+    out
+  }
 }
 
 object JavaTestUtils extends JavaTestBase {