From 3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Tue, 27 Nov 2012 22:38:09 -0800
Subject: [PATCH] Added zip to Java API

---
 .../main/scala/spark/api/java/JavaRDDLike.scala   | 10 ++++++++++
 core/src/test/scala/spark/JavaAPISuite.java       | 15 +++++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 13fcee1004..482eb9281a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -172,6 +172,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
     rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
 
+  /**
+   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+   * partitions* and the *same number of elements in each partition* (e.g. one was made through
+   * a map on the other).
+   */
+  def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
+    JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
+  }
+
   // Actions (launch a job to return a value to the user program)
     
   /**
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5875506179..007bb28692 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -44,6 +44,8 @@ public class JavaAPISuite implements Serializable {
   public void tearDown() {
     sc.stop();
     sc = null;
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port");
   }
 
   static class ReverseIntComparator implements Comparator<Integer>, Serializable {
@@ -553,4 +555,17 @@ public class JavaAPISuite implements Serializable {
       }
     }).collect().toString());
   }
+
+  @Test
+  public void zip() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+    JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+      @Override
+      public Double call(Integer x) {
+        return 1.0 * x;
+      }
+    });
+    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+    zipped.count();
+  }
 }
-- 
GitLab