From 1fdb6946b5d076ed0f1b4d2bca2a20b6cd22cbc3 Mon Sep 17 00:00:00 2001
From: Stephen Haberman <stephen@exigencecorp.com>
Date: Sat, 5 Jan 2013 13:07:59 -0600
Subject: [PATCH] Add RDD.tupleBy.

---
 core/src/main/scala/spark/RDD.scala      | 7 +++++++
 core/src/test/scala/spark/RDDSuite.scala | 1 +
 2 files changed, 8 insertions(+)

diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 7e38583391..7aa4b0a173 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -510,6 +510,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
       .saveAsSequenceFile(path)
   }
 
+  /**
+   * Tuples the elements of this RDD by applying `f`.
+   */
+  def tupleBy[K](f: T => K): RDD[(K, T)] = {
+    map(x => (f(x), x))
+  }
+
   /** A private method for tests, to look at the contents of each partition */
   private[spark] def collectPartitions(): Array[Array[T]] = {
     sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 45e6c5f840..7832884224 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -35,6 +35,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
     assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
     assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
     assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
+    assert(nums.tupleBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
     val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
     assert(partitionSums.collect().toList === List(3, 7))
 
-- 
GitLab