diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 033d334079b59edf34763e33db146457a7d8baa8..8010bb68e31dd142a6e3d9afe37a6e56118599a8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -393,6 +393,43 @@ abstract class RDD[T: ClassTag](
    */
   def ++(other: RDD[T]): RDD[T] = this.union(other)
 
+  /**
+   * Return the intersection of this RDD and another one. The output will not contain any duplicate
+   * elements, even if the input RDDs did.
+   *
+   * Note that this method performs a shuffle internally.
+   */
+  def intersection(other: RDD[T]): RDD[T] =
+    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
+        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
+        .keys
+
+  /**
+   * Return the intersection of this RDD and another one. The output will not contain any duplicate
+   * elements, even if the input RDDs did.
+   *
+   * Note that this method performs a shuffle internally.
+   *
+   * @param partitioner Partitioner to use for the resulting RDD
+   */
+  def intersection(other: RDD[T], partitioner: Partitioner): RDD[T] =
+    this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
+        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
+        .keys
+
+  /**
+   * Return the intersection of this RDD and another one. The output will not contain any duplicate
+   * elements, even if the input RDDs did.  Performs a hash partition across the cluster
+   *
+   * Note that this method performs a shuffle internally.
+   *
+   * @param numPartitions How many partitions to use in the resulting RDD
+   */
+  def intersection(other: RDD[T], numPartitions: Int): RDD[T] =
+    this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new HashPartitioner(numPartitions))
+        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
+        .keys
+
   /**
    * Return an RDD created by coalescing all elements within each partition into an array.
    */
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 223ebec5fa3dce3f739c915d0c8073b1dc4f64b4..879c4e5f17c72497115d8e9c306e15b1e34d26db 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -373,8 +373,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
       val prng42 = new Random(42)
       val prng43 = new Random(43)
       Array(1, 2, 3, 4, 5, 6).filter{i =>
-	      if (i < 4) 0 == prng42.nextInt(3)
-	      else 0 == prng43.nextInt(3)}
+        if (i < 4) 0 == prng42.nextInt(3)
+        else 0 == prng43.nextInt(3)}
     }
     assert(sample.size === checkSample.size)
     for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
@@ -506,4 +506,23 @@ class RDDSuite extends FunSuite with SharedSparkContext {
       sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
     }
   }
+
+  test("intersection") {
+    val all = sc.parallelize(1 to 10)
+    val evens = sc.parallelize(2 to 10 by 2)
+    val intersection = Array(2, 4, 6, 8, 10)
+
+    // intersection is commutative
+    assert(all.intersection(evens).collect.sorted === intersection)
+    assert(evens.intersection(all).collect.sorted === intersection)
+  }
+
+  test("intersection strips duplicates in an input") {
+    val a = sc.parallelize(Seq(1,2,3,3))
+    val b = sc.parallelize(Seq(1,1,2,3))
+    val intersection = Array(1,2,3)
+
+    assert(a.intersection(b).collect.sorted === intersection)
+    assert(b.intersection(a).collect.sorted === intersection)
+  }
 }