diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 07be8ba58efa3e73cf5c86cc97367b48250e68bf..42ab27bf55ccf0f5e4834297f007c8490bfc7f17 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
 [Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
 explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
 
+## Upgrade Guide from Spark 0.9.1
+
+GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.
+
+[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
+
 # Getting Started
 
 To get started you first need to import Spark and GraphX into your project, as follows:
@@ -145,12 +151,12 @@ the vertices and edges of the graph:
 {% highlight scala %}
 class Graph[VD, ED] {
   val vertices: VertexRDD[VD]
-  val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED, VD]
 }
 {% endhighlight %}
 
-The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
-VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide  additional
+The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
+VD)]` and `RDD[Edge[ED]]` respectively.  Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide  additional
 functionality built around graph computation and leverage internal optimizations.  We discuss the
 `VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
 RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
@@ -302,7 +308,7 @@ class Graph[VD, ED] {
   val degrees: VertexRDD[Int]
   // Views of the graph as collections =============================================================
   val vertices: VertexRDD[VD]
-  val edges: EdgeRDD[ED]
+  val edges: EdgeRDD[ED, VD]
   val triplets: RDD[EdgeTriplet[VD, ED]]
   // Functions for caching graphs ==================================================================
   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
 
 ## EdgeRDDs
 
-The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
+The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
 of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy].  Within
 each partition, edge attributes and adjacency structure, are stored separately enabling maximum
 reuse when changing attribute values.
@@ -918,11 +924,11 @@ reuse when changing attribute values.
 The three additional functions exposed by the `EdgeRDD` are:
 {% highlight scala %}
 // Transform the edge attributes while preserving the structure
-def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
+def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
 // Revere the edges reusing both attributes and structure
-def reverse: EdgeRDD[ED]
+def reverse: EdgeRDD[ED, VD]
 // Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
 {% endhighlight %}
 
 In most applications we have found that operations on the `EdgeRDD` are accomplished through the
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fa78ca99b8891149b7c7191a3390609d4a286ef9..a8fc095072512c18e9494078c60fbd10a8eacfe8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -20,16 +20,19 @@ package org.apache.spark.graphx
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
-import org.apache.spark.graphx.impl.EdgePartition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.spark.graphx.impl.EdgePartition
+
 /**
- * `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
- * for performance.
+ * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
+ * partition for performance. It may additionally store the vertex attributes associated with each
+ * edge to provide the triplet view. Shipping of the vertex attributes is managed by
+ * `impl.ReplicatedVertexView`.
  */
-class EdgeRDD[@specialized ED: ClassTag](
-    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
+class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
+    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
   extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   partitionsRDD.setName("EdgeRDD")
@@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
     partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
-    val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
-    p.next._2.iterator.map(_.copy())
+    val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
+    if (p.hasNext) {
+      p.next._2.iterator.map(_.copy())
+    } else {
+      Iterator.empty
+    }
   }
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
     this
   }
 
-  private[graphx] def mapEdgePartitions[ED2: ClassTag](
-      f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
-    new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
-      val (pid, ep) = iter.next()
-      Iterator(Tuple2(pid, f(pid, ep)))
+  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+    new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+      if (iter.hasNext) {
+        val (pid, ep) = iter.next()
+        Iterator(Tuple2(pid, f(pid, ep)))
+      } else {
+        Iterator.empty
+      }
     }, preservesPartitioning = true))
   }
 
@@ -76,7 +87,7 @@ class EdgeRDD[@specialized ED: ClassTag](
    * @param f the function from an edge to a new edge value
    * @return a new EdgeRDD containing the new edge values
    */
-  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
+  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
     mapEdgePartitions((pid, part) => part.map(f))
 
   /**
@@ -84,7 +95,14 @@ class EdgeRDD[@specialized ED: ClassTag](
    *
    * @return a new EdgeRDD containing all the edges reversed
    */
-  def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
+  def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+
+  /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
+  def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+    mapEdgePartitions((pid, part) => part.filter(epred, vpred))
+  }
 
   /**
    * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
    *         with values supplied by `f`
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgeRDD[ED2])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
+      (other: EdgeRDD[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
     val ed2Tag = classTag[ED2]
     val ed3Tag = classTag[ED3]
-    new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+    new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
       (thisIter, otherIter) =>
         val (pid, thisEPart) = thisIter.next()
         val (_, otherEPart) = otherIter.next()
         Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
     })
   }
-
-  private[graphx] def collectVertexIds(): RDD[VertexId] = {
-    partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
-  }
 }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index dfc6a801587d229d7f2e66078eaa87a8a395e80f..9d473d5ebda4438ee34d7d28132ca790c8584690 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
     if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
 
   override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+
+  def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
 }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 50395868902dca4512ef71b46074f2c54b811862..dc5dac4fdad578cb5ae221f95178007470b414e3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * along with their vertex data.
    *
    */
-  @transient val edges: EdgeRDD[ED]
+  @transient val edges: EdgeRDD[ED, VD]
 
   /**
    * An RDD containing the edge triplets, which are edges along with the vertex data associated with
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index dd380d8c182c98cf6668d4303204ee8d59f59836..d295d0127ac72ca5101ec6610f2b63d061c1a57a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -19,10 +19,11 @@ package org.apache.spark.graphx
 
 import com.esotericsoftware.kryo.Kryo
 
-import org.apache.spark.graphx.impl._
 import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx.impl._
 
 /**
  * Registers GraphX classes with Kryo for improved performance.
@@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[Edge[Object]])
     kryo.register(classOf[MessageToPartition[Object]])
     kryo.register(classOf[VertexBroadcastMsg[Object]])
+    kryo.register(classOf[RoutingTableMessage])
     kryo.register(classOf[(VertexId, Object)])
-    kryo.register(classOf[EdgePartition[Object]])
+    kryo.register(classOf[EdgePartition[Object, Object]])
     kryo.register(classOf[BitSet])
     kryo.register(classOf[VertexIdToIndexMap])
     kryo.register(classOf[VertexAttributeBlock[Object]])
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 18858466db27b655f62697618495a8fbb6a97df9..389490c139848e1623465321bd284c3432eadc7b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -47,8 +47,7 @@ object GraphLoader extends Logging {
    * @param path the path to the file (e.g., /home/data/file or hdfs://file)
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
-   * @param minEdgePartitions the number of partitions for the
-   *        the edge RDD
+   * @param minEdgePartitions the number of partitions for the edge RDD
    */
   def edgeListFile(
       sc: SparkContext,
@@ -60,8 +59,9 @@ object GraphLoader extends Logging {
     val startTime = System.currentTimeMillis
 
     // Parse the edge data table directly into edge partitions
-    val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[Int]
+    val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
+    val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
+      val builder = new EdgePartitionBuilder[Int, Int]
       iter.foreach { line =>
         if (!line.isEmpty && line(0) != '#') {
           val lineArray = line.split("\\s+")
@@ -78,7 +78,7 @@ object GraphLoader extends Logging {
         }
       }
       Iterator((pid, builder.toEdgePartition))
-    }.cache()
+    }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
     edges.count()
 
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 4997fbc3cbcd827d010103f6d7041327cb7a30ca..edd5b79da1522c87602874e74e73708e7947a07f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.graphx
 
 import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
+import scala.util.Random
+
 import org.apache.spark.SparkException
-import org.apache.spark.graphx.lib._
+import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
-import scala.util.Random
+
+import org.apache.spark.graphx.lib._
 
 /**
  * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
    * The in-degree of each vertex in the graph.
    * @note Vertices with no in-edges are not returned in the resulting RDD.
    */
-  @transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
+  @transient lazy val inDegrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")
 
   /**
    * The out-degree of each vertex in the graph.
    * @note Vertices with no out-edges are not returned in the resulting RDD.
    */
-  @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
+  @transient lazy val outDegrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")
 
   /**
    * The degree of each vertex in the graph.
    * @note Vertices with no edges are not returned in the resulting RDD.
    */
-  @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
+  @transient lazy val degrees: VertexRDD[Int] =
+    degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")
 
   /**
    * Computes the neighboring vertex degrees.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index ac07a594a12e495eea77b29c79766e8720f9a004..4572eab2875bb536f3f58715d7236af58ba1e545 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.graphx
 
 import scala.reflect.ClassTag
+import org.apache.spark.Logging
 
 
 /**
@@ -52,7 +53,7 @@ import scala.reflect.ClassTag
  * }}}
  *
  */
-object Pregel {
+object Pregel extends Logging {
 
   /**
    * Execute a Pregel-like iterative vertex-parallel abstraction.  The
@@ -142,6 +143,9 @@ object Pregel {
       // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
       // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
       activeMessages = messages.count()
+
+      logInfo("Pregel finished iteration " + i)
+
       // Unpersist the RDDs hidden by newly-materialized RDDs
       oldMessages.unpersist(blocking=false)
       newVerts.unpersist(blocking=false)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f0fc605c88575250440043d8e3feb3684be2817b..8c62897037b6ddf99dd6bc8b9baa973bcd871082 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -24,8 +24,11 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 import org.apache.spark.storage.StorageLevel
 
-import org.apache.spark.graphx.impl.MsgRDDFunctions
-import org.apache.spark.graphx.impl.VertexPartition
+import org.apache.spark.graphx.impl.RoutingTablePartition
+import org.apache.spark.graphx.impl.ShippableVertexPartition
+import org.apache.spark.graphx.impl.VertexAttributeBlock
+import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._
+import org.apache.spark.graphx.impl.VertexRDDFunctions._
 
 /**
  * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
@@ -33,6 +36,9 @@ import org.apache.spark.graphx.impl.VertexPartition
  * joined efficiently. All operations except [[reindex]] preserve the index. To construct a
  * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
  *
+ * Additionally, stores routing information to enable joining the vertex attributes with an
+ * [[EdgeRDD]].
+ *
  * @example Construct a `VertexRDD` from a plain RDD:
  * {{{
  * // Construct an initial vertex set
@@ -50,13 +56,11 @@ import org.apache.spark.graphx.impl.VertexPartition
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
 class VertexRDD[@specialized VD: ClassTag](
-    val partitionsRDD: RDD[VertexPartition[VD]])
+    val partitionsRDD: RDD[ShippableVertexPartition[VD]])
   extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   require(partitionsRDD.partitioner.isDefined)
 
-  partitionsRDD.setName("VertexRDD")
-
   /**
    * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
    * VertexRDD will be based on a different index and can no longer be quickly joined with this
@@ -71,6 +75,16 @@ class VertexRDD[@specialized VD: ClassTag](
   override protected def getPreferredLocations(s: Partition): Seq[String] =
     partitionsRDD.preferredLocations(s)
 
+  override def setName(_name: String): this.type = {
+    if (partitionsRDD.name != null) {
+      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+    } else {
+      partitionsRDD.setName(_name)
+    }
+    this
+  }
+  setName("VertexRDD")
+
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -90,14 +104,14 @@ class VertexRDD[@specialized VD: ClassTag](
    * Provides the `RDD[(VertexId, VD)]` equivalent output.
    */
   override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
-    firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
+    firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator
   }
 
   /**
    * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
    */
   private[graphx] def mapVertexPartitions[VD2: ClassTag](
-    f: VertexPartition[VD] => VertexPartition[VD2])
+      f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
     : VertexRDD[VD2] = {
     val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
     new VertexRDD(newPartitionsRDD)
@@ -208,10 +222,8 @@ class VertexRDD[@specialized VD: ClassTag](
       case _ =>
         new VertexRDD[VD3](
           partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = true)
-          { (part, msgs) =>
-            val vertexPartition: VertexPartition[VD] = part.next()
-            Iterator(vertexPartition.leftJoin(msgs)(f))
+            other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+            (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
           }
         )
     }
@@ -254,10 +266,8 @@ class VertexRDD[@specialized VD: ClassTag](
       case _ =>
         new VertexRDD(
           partitionsRDD.zipPartitions(
-            other.partitionBy(this.partitioner.get), preservesPartitioning = true)
-          { (part, msgs) =>
-            val vertexPartition: VertexPartition[VD] = part.next()
-            Iterator(vertexPartition.innerJoin(msgs)(f))
+            other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+            (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
           }
         )
     }
@@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
    */
   def aggregateUsingIndex[VD2: ClassTag](
       messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
-    val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
+    val shuffled = messages.copartitionWithVertices(this.partitioner.get)
     val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
-      val vertexPartition: VertexPartition[VD] = thisIter.next()
-      Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+      thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
     }
     new VertexRDD[VD2](parts)
   }
 
+  /**
+   * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
+   * [[EdgeRDD]].
+   */
+  def reverseRoutingTables(): VertexRDD[VD] =
+    this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+
+  /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
+  private[graphx] def shipVertexAttributes(
+      shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
+  }
+
+  /** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
+  private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
+    partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
+  }
+
 } // end of VertexRDD
 
 
@@ -293,52 +320,101 @@ class VertexRDD[@specialized VD: ClassTag](
 object VertexRDD {
 
   /**
-   * Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
-   * Duplicate entries are removed arbitrarily.
+   * Constructs a standalone `VertexRDD` (one that is not set up for efficient joins with an
+   * [[EdgeRDD]]) from an RDD of vertex-attribute pairs. Duplicate entries are removed arbitrarily.
    *
    * @tparam VD the vertex attribute type
    *
-   * @param rdd the collection of vertex-attribute pairs
+   * @param vertices the collection of vertex-attribute pairs
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
-    val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
-      case Some(p) => rdd
-      case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+  def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+    val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+      case Some(p) => vertices
+      case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
     }
-    val vertexPartitions = partitioned.mapPartitions(
-      iter => Iterator(VertexPartition(iter)),
+    val vertexPartitions = vPartitioned.mapPartitions(
+      iter => Iterator(ShippableVertexPartition(iter)),
       preservesPartitioning = true)
     new VertexRDD(vertexPartitions)
   }
 
   /**
-   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
-   * `mergeFunc`.
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+   * removed arbitrarily. The resulting `VertexRDD` will be joinable with `edges`, and any missing
+   * vertices referred to by `edges` will be created with the attribute `defaultVal`.
    *
    * @tparam VD the vertex attribute type
    *
-   * @param rdd the collection of vertex-attribute pairs
-   * @param mergeFunc the associative, commutative merge function.
+   * @param vertices the collection of vertex-attribute pairs
+   * @param edges the [[EdgeRDD]] that these vertices may be joined with
+   * @param defaultVal the vertex attribute to use when creating missing vertices
    */
-  def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
-    val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
-      case Some(p) => rdd
-      case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+  def apply[VD: ClassTag](
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
+    VertexRDD(vertices, edges, defaultVal, (a, b) => b)
+  }
+
+  /**
+   * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+   * merged using `mergeFunc`. The resulting `VertexRDD` will be joinable with `edges`, and any
+   * missing vertices referred to by `edges` will be created with the attribute `defaultVal`.
+   *
+   * @tparam VD the vertex attribute type
+   *
+   * @param vertices the collection of vertex-attribute pairs
+   * @param edges the [[EdgeRDD]] that these vertices may be joined with
+   * @param defaultVal the vertex attribute to use when creating missing vertices
+   * @param mergeFunc the commutative, associative duplicate vertex attribute merge function
+   */
+  def apply[VD: ClassTag](
+      vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
+    ): VertexRDD[VD] = {
+    val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+      case Some(p) => vertices
+      case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
+    }
+    val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
+    val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
+      (vertexIter, routingTableIter) =>
+        val routingTable =
+          if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+        Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
     }
-    val vertexPartitions = partitioned.mapPartitions(
-      iter => Iterator(VertexPartition(iter)),
-      preservesPartitioning = true)
     new VertexRDD(vertexPartitions)
   }
 
   /**
-   * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
-   * `defaultVal` otherwise.
+   * Constructs a `VertexRDD` containing all vertices referred to in `edges`. The vertices will be
+   * created with the attribute `defaultVal`. The resulting `VertexRDD` will be joinable with
+   * `edges`.
+   *
+   * @tparam VD the vertex attribute type
+   *
+   * @param edges the [[EdgeRDD]] referring to the vertices to create
+   * @param numPartitions the desired number of partitions for the resulting `VertexRDD`
+   * @param defaultVal the vertex attribute to use when creating missing vertices
    */
-  def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
-    : VertexRDD[VD] = {
-    VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
-      value.getOrElse(default)
-    }
+  def fromEdges[VD: ClassTag](
+      edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
+    val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
+    val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
+      val routingTable =
+        if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+      Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
+    }, preservesPartitioning = true)
+    new VertexRDD(vertexPartitions)
+  }
+
+  private def createRoutingTables(
+      edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
+    // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+    val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
+      Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
+      .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")
+
+    val numEdgePartitions = edges.partitions.size
+    vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions(
+      iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
+      preservesPartitioning = true)
   }
 }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index b7c472e905a9b82705820441db7de43419b2e49c..871e81f8d245c0c95dd7fc412795e0b4b7f88fdd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -17,39 +17,86 @@
 
 package org.apache.spark.graphx.impl
 
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
- * clustered by src.
+ * A collection of edges stored in columnar format, along with any vertex attributes referenced. The
+ * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by
+ * src. There is an optional active vertex set for filtering computation on the edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the vertex attribute type
  *
  * @param srcIds the source vertex id of each edge
  * @param dstIds the destination vertex id of each edge
  * @param data the attribute associated with each edge
  * @param index a clustered index on source vertex id
- * @tparam ED the edge attribute type.
+ * @param vertices a map from referenced vertex ids to their corresponding attributes. Must
+ *   contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
+ *   those vertex ids. The mask is not used.
+ * @param activeSet an optional active vertex set for filtering computation on the edges
  */
 private[graphx]
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
+class EdgePartition[
+    @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
     @transient val srcIds: Array[VertexId],
     @transient val dstIds: Array[VertexId],
     @transient val data: Array[ED],
-    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
+    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
+    @transient val vertices: VertexPartition[VD],
+    @transient val activeSet: Option[VertexSet] = None
+  ) extends Serializable {
+
+  /** Return a new `EdgePartition` with the specified edge data. */
+  def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
+    new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
+  }
+
+  /** Return a new `EdgePartition` with the specified vertex partition. */
+  def withVertices[VD2: ClassTag](
+      vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
+    new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+  }
+
+  /** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
+  def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
+    val newActiveSet = new VertexSet
+    iter.foreach(newActiveSet.add(_))
+    new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
+  }
+
+  /** Return a new `EdgePartition` with the specified active set. */
+  def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
+    new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
+  }
+
+  /** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
+  def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
+    this.withVertices(vertices.innerJoinKeepLeft(iter))
+  }
+
+  /** Look up vid in activeSet, throwing an exception if it is None. */
+  def isActive(vid: VertexId): Boolean = {
+    activeSet.get.contains(vid)
+  }
+
+  /** The number of active vertices, if any exist. */
+  def numActives: Option[Int] = activeSet.map(_.size)
 
   /**
    * Reverse all the edges in this partition.
    *
    * @return a new edge partition with all edges reversed.
    */
-  def reverse: EdgePartition[ED] = {
-    val builder = new EdgePartitionBuilder(size)
+  def reverse: EdgePartition[ED, VD] = {
+    val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
     for (e <- iterator) {
       builder.add(e.dstId, e.srcId, e.attr)
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -64,7 +111,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @return a new edge partition with the result of the function `f`
    *         applied to each edge
    */
-  def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+  def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = {
     val newData = new Array[ED2](data.size)
     val edge = new Edge[ED]()
     val size = data.size
@@ -76,7 +123,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       newData(i) = f(edge)
       i += 1
     }
-    new EdgePartition(srcIds, dstIds, newData, index)
+    this.withData(newData)
   }
 
   /**
@@ -91,7 +138,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @tparam ED2 the type of the new attribute
    * @return a new edge partition with the attribute values replaced
    */
-  def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+  def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = {
     // Faster than iter.toArray, because the expected size is known.
     val newData = new Array[ED2](data.size)
     var i = 0
@@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       i += 1
     }
     assert(newData.size == i)
-    new EdgePartition(srcIds, dstIds, newData, index)
+    this.withData(newData)
+  }
+
+  /**
+   * Construct a new edge partition containing only the edges matching `epred` and where both
+   * vertices match `vpred`.
+   */
+  def filter(
+      epred: EdgeTriplet[VD, ED] => Boolean,
+      vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
+    val filtered = tripletIterator().filter(et =>
+      vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
+    val builder = new EdgePartitionBuilder[ED, VD]
+    for (e <- filtered) {
+      builder.add(e.srcId, e.dstId, e.attr)
+    }
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -119,8 +182,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * @param merge a commutative associative merge operation
    * @return a new edge partition without duplicate edges
    */
-  def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
-    val builder = new EdgePartitionBuilder[ED]
+  def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
+    val builder = new EdgePartitionBuilder[ED, VD]
     var currSrcId: VertexId = null.asInstanceOf[VertexId]
     var currDstId: VertexId = null.asInstanceOf[VertexId]
     var currAttr: ED = null.asInstanceOf[ED]
@@ -141,11 +204,11 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
     if (size > 0) {
       builder.add(currSrcId, currDstId, currAttr)
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
-   * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
+   * Apply `f` to all edges present in both `this` and `other` and return a new `EdgePartition`
    * containing the resulting edges.
    *
    * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
@@ -155,9 +218,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    * once.
    */
   def innerJoin[ED2: ClassTag, ED3: ClassTag]
-      (other: EdgePartition[ED2])
-      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
-    val builder = new EdgePartitionBuilder[ED3]
+      (other: EdgePartition[ED2, _])
+      (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
+    val builder = new EdgePartitionBuilder[ED3, VD]
     var i = 0
     var j = 0
     // For i = index of each edge in `this`...
@@ -175,7 +238,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
       }
       i += 1
     }
-    builder.toEdgePartition
+    builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
   }
 
   /**
@@ -183,7 +246,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
    *
    * @return size of the partition
    */
-  def size: Int = srcIds.size
+  val size: Int = srcIds.size
 
   /** The number of unique source vertices in the partition. */
   def indexSize: Int = index.size
@@ -211,10 +274,35 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
     }
   }
 
+  /**
+   * Get an iterator over the edge triplets in this partition.
+   *
+   * It is safe to keep references to the objects from this iterator.
+   */
+  def tripletIterator(
+      includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = {
+    new EdgeTripletIterator(this, includeSrc, includeDst)
+  }
+
+  /**
+   * Upgrade the given edge iterator into a triplet iterator.
+   *
+   * Be careful not to keep references to the objects from this iterator. To improve GC performance
+   * the same object is re-used in `next()`.
+   */
+  def upgradeIterator(
+      edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
+    : Iterator[EdgeTriplet[VD, ED]] = {
+    new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+  }
+
   /**
    * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
    * iterator is generated using an index scan, so it is efficient at skipping edges that don't
    * match srcIdPred.
+   *
+   * Be careful not to keep references to the objects from this iterator. To improve GC performance
+   * the same object is re-used in `next()`.
    */
   def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
     index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 63ccccb056b48e5cc2747fc54b1ff673175fd190..ecb49bef42e455223876048102c0305df105f6de 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -20,12 +20,14 @@ package org.apache.spark.graphx.impl
 import scala.reflect.ClassTag
 import scala.util.Sorting
 
+import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.PrimitiveVector
 
 private[graphx]
-class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
+    size: Int = 64) {
   var edges = new PrimitiveVector[Edge[ED]](size)
 
   /** Add a new edge to the partition. */
@@ -33,7 +35,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
     edges += Edge(src, dst, d)
   }
 
-  def toEdgePartition: EdgePartition[ED] = {
+  def toEdgePartition: EdgePartition[ED, VD] = {
     val edgeArray = edges.trim().array
     Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
     val srcIds = new Array[VertexId](edgeArray.size)
@@ -57,6 +59,14 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
         i += 1
       }
     }
-    new EdgePartition(srcIds, dstIds, data, index)
+
+    // Create and populate a VertexPartition with vids from the edges, but no attributes
+    val vidsIter = srcIds.iterator ++ dstIds.iterator
+    val vertexIds = new OpenHashSet[VertexId]
+    vidsIter.foreach(vid => vertexIds.add(vid))
+    val vertices = new VertexPartition(
+      vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet)
+
+    new EdgePartition(srcIds, dstIds, data, index, vertices)
   }
 }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 220a89d73d7117e643f4fc1797d0d07d5109a6c5..ebb0b9418d65dc9e2d3b58d96bbb709295fbc3e5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -23,32 +23,62 @@ import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 
 /**
- * The Iterator type returned when constructing edge triplets. This class technically could be
- * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
- * debug / profile.
+ * The Iterator type returned when constructing edge triplets. This could be an anonymous class in
+ * EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
  */
 private[impl]
 class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
-    val vidToIndex: VertexIdToIndexMap,
-    val vertexArray: Array[VD],
-    val edgePartition: EdgePartition[ED])
+    val edgePartition: EdgePartition[ED, VD],
+    val includeSrc: Boolean,
+    val includeDst: Boolean)
   extends Iterator[EdgeTriplet[VD, ED]] {
 
   // Current position in the array.
   private var pos = 0
 
-  private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
-
   override def hasNext: Boolean = pos < edgePartition.size
 
   override def next() = {
     val triplet = new EdgeTriplet[VD, ED]
     triplet.srcId = edgePartition.srcIds(pos)
-    triplet.srcAttr = vmap(triplet.srcId)
+    if (includeSrc) {
+      triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+    }
     triplet.dstId = edgePartition.dstIds(pos)
-    triplet.dstAttr = vmap(triplet.dstId)
+    if (includeDst) {
+      triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+    }
     triplet.attr = edgePartition.data(pos)
     pos += 1
     triplet
   }
 }
+
+/**
+ * An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
+ * class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
+ * profile.
+ */
+private[impl]
+class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
+    val edgeIter: Iterator[Edge[ED]],
+    val edgePartition: EdgePartition[ED, VD],
+    val includeSrc: Boolean,
+    val includeDst: Boolean)
+  extends Iterator[EdgeTriplet[VD, ED]] {
+
+  private val triplet = new EdgeTriplet[VD, ED]
+
+  override def hasNext = edgeIter.hasNext
+
+  override def next() = {
+    triplet.set(edgeIter.next())
+    if (includeSrc) {
+      triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+    }
+    if (includeDst) {
+      triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+    }
+    triplet
+  }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 9eabccdee48dbb9c56222d00584cbd6d67bafcaf..2f2d0e03fd7b5b03d1e29751b1e87e4e7f17d687 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -19,54 +19,45 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.util.collection.PrimitiveVector
-import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.HashPartitioner
 import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{RDD, ShuffledRDD}
+import org.apache.spark.storage.StorageLevel
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.impl.GraphImpl._
 import org.apache.spark.graphx.impl.MsgRDDFunctions._
 import org.apache.spark.graphx.util.BytecodeUtils
-import org.apache.spark.rdd.{ShuffledRDD, RDD}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.ClosureCleaner
 
 
 /**
- * A graph that supports computation on graphs.
+ * An implementation of [[org.apache.spark.graphx.Graph]] to support computation on graphs.
  *
- * Graphs are represented using two classes of data: vertex-partitioned and
- * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
- * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
- * vertex attributes are replicated to the edge partitions where they appear as sources or
- * destinations. `routingTable` stores the routing information for shipping vertex attributes to
- * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
- * using the routing table.
+ * Graphs are represented using two RDDs: `vertices`, which contains vertex attributes and the
+ * routing information for shipping vertex attributes to edge partitions, and
+ * `replicatedVertexView`, which contains edges and the vertex attributes mentioned by each edge.
  */
 class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     @transient val vertices: VertexRDD[VD],
-    @transient val edges: EdgeRDD[ED],
-    @transient val routingTable: RoutingTable,
-    @transient val replicatedVertexView: ReplicatedVertexView[VD])
+    @transient val replicatedVertexView: ReplicatedVertexView[VD, ED])
   extends Graph[VD, ED] with Serializable {
 
   /** Default constructor is provided to support serialization */
-  protected def this() = this(null, null, null, null)
+  protected def this() = this(null, null)
+
+  @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
 
   /** Return a RDD that brings edges together with their source and destination vertices. */
-  @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
-    val vdTag = classTag[VD]
-    val edTag = classTag[ED]
-    edges.partitionsRDD.zipPartitions(
-      replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
-      val (pid, ePart) = ePartIter.next()
-      val (_, vPart) = vPartIter.next()
-      new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
-    }
+  @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
+    replicatedVertexView.upgrade(vertices, true, true)
+    replicatedVertexView.edges.partitionsRDD.mapPartitions(_.flatMap {
+      case (pid, part) => part.tripletIterator()
+    })
   }
 
   override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
     vertices.persist(newLevel)
-    edges.persist(newLevel)
+    replicatedVertexView.edges.persist(newLevel)
     this
   }
 
@@ -74,14 +65,15 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
-    replicatedVertexView.unpersist(blocking)
+    // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
     this
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
-    val numPartitions = edges.partitions.size
+    val numPartitions = replicatedVertexView.edges.partitions.size
     val edTag = classTag[ED]
-    val newEdges = new EdgeRDD(edges.map { e =>
+    val vdTag = classTag[VD]
+    val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
       val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
 
       // Should we be using 3-tuple or an optimized class
@@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     }
       .partitionBy(new HashPartitioner(numPartitions))
       .mapPartitionsWithIndex( { (pid, iter) =>
-        val builder = new EdgePartitionBuilder[ED]()(edTag)
+        val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
         iter.foreach { message =>
           val data = message.data
           builder.add(data._1, data._2, data._3)
         }
         val edgePartition = builder.toEdgePartition
         Iterator((pid, edgePartition))
-      }, preservesPartitioning = true).cache())
-    GraphImpl(vertices, newEdges)
+      }, preservesPartitioning = true))
+    GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
   override def reverse: Graph[VD, ED] = {
-    val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
-    GraphImpl(vertices, newETable)
+    new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
   }
 
   override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
     if (classTag[VD] equals classTag[VD2]) {
+      vertices.cache()
       // The map preserves type, so we can use incremental replication
       val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
-      val newReplicatedVertexView = new ReplicatedVertexView[VD2](
-        changedVerts, edges, routingTable,
-        Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
-      new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+        .updateVertices(changedVerts)
+      new GraphImpl(newVerts, newReplicatedVertexView)
     } else {
       // The map does not preserve type, so we must re-replicate all vertices
-      GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
+      GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
     }
   }
 
   override def mapEdges[ED2: ClassTag](
       f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
-    val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
-    new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges
+      .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   override def mapTriplets[ED2: ClassTag](
       f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
-    val newEdgePartitions =
-      edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
-        (ePartIter, vTableReplicatedIter) =>
-        val (ePid, edgePartition) = ePartIter.next()
-        val (vPid, vPart) = vTableReplicatedIter.next()
-        assert(!vTableReplicatedIter.hasNext)
-        assert(ePid == vPid)
-        val et = new EdgeTriplet[VD, ED]
-        val inputIterator = edgePartition.iterator.map { e =>
-          et.set(e)
-          et.srcAttr = vPart(e.srcId)
-          et.dstAttr = vPart(e.dstId)
-          et
-        }
-        // Apply the user function to the vertex partition
-        val outputIter = f(ePid, inputIterator)
-        // Consume the iterator to update the edge attributes
-        val newEdgePartition = edgePartition.map(outputIter)
-        Iterator((ePid, newEdgePartition))
-      }
-    new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
+    vertices.cache()
+    val mapUsesSrcAttr = accessesVertexAttr(f, "srcAttr")
+    val mapUsesDstAttr = accessesVertexAttr(f, "dstAttr")
+    replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+    val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
+      part.map(f(pid, part.tripletIterator(mapUsesSrcAttr, mapUsesDstAttr)))
+    }
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   override def subgraph(
       epred: EdgeTriplet[VD, ED] => Boolean = x => true,
       vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+    vertices.cache()
     // Filter the vertices, reusing the partitioner and the index from this graph
     val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
-
-    // Filter the edges
-    val edTag = classTag[ED]
-    val newEdges = new EdgeRDD[ED](triplets.filter { et =>
-      vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
-    }.mapPartitionsWithIndex( { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED]()(edTag)
-      iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
-      val edgePartition = builder.toEdgePartition
-      Iterator((pid, edgePartition))
-    }, preservesPartitioning = true)).cache()
-
-    // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
-    // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
-    // an edge.
-    new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
-  } // end of subgraph
+    // Filter the triplets. We must always upgrade the triplet view fully because vpred always runs
+    // on both src and dst vertices
+    replicatedVertexView.upgrade(vertices, true, true)
+    val newEdges = replicatedVertexView.edges.filter(epred, vpred)
+    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
+  }
 
   override def mask[VD2: ClassTag, ED2: ClassTag] (
       other: Graph[VD2, ED2]): Graph[VD, ED] = {
     val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
-    val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
-    // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
-    // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
-    // an edge.
-    new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
+    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
   }
 
   override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
-    ClosureCleaner.clean(merge)
-    val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge))
-    new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+    val newEdges = replicatedVertexView.edges.mapEdgePartitions(
+      (pid, part) => part.groupEdges(merge))
+    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
   }
 
   // ///////////////////////////////////////////////////////////////////////////////////////////////
@@ -199,68 +165,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
       reduceFunc: (A, A) => A,
       activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
 
-    ClosureCleaner.clean(mapFunc)
-    ClosureCleaner.clean(reduceFunc)
+    vertices.cache()
 
     // For each vertex, replicate its attribute only to partitions where it is
     // in the relevant position in an edge.
     val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
     val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
-    val vs = activeSetOpt match {
+    replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+    val view = activeSetOpt match {
       case Some((activeSet, _)) =>
-        replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+        replicatedVertexView.withActiveSet(activeSet)
       case None =>
-        replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
+        replicatedVertexView
     }
     val activeDirectionOpt = activeSetOpt.map(_._2)
 
     // Map and combine.
-    val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
-      val (ePid, edgePartition) = ePartIter.next()
-      val (vPid, vPart) = vPartIter.next()
-      assert(!vPartIter.hasNext)
-      assert(ePid == vPid)
-      // Choose scan method
-      val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
-      val edgeIter = activeDirectionOpt match {
-        case Some(EdgeDirection.Both) =>
-          if (activeFraction < 0.8) {
-            edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
-              .filter(e => vPart.isActive(e.dstId))
-          } else {
-            edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
-          }
-        case Some(EdgeDirection.Either) =>
-          // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
-          // the index here. Instead we have to scan all edges and then do the filter.
-          edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
-        case Some(EdgeDirection.Out) =>
-          if (activeFraction < 0.8) {
-            edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
-          } else {
-            edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
-          }
-        case Some(EdgeDirection.In) =>
-          edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
-        case _ => // None
-          edgePartition.iterator
-      }
-
-      // Scan edges and run the map function
-      val et = new EdgeTriplet[VD, ED]
-      val mapOutputs = edgeIter.flatMap { e =>
-        et.set(e)
-        if (mapUsesSrcAttr) {
-          et.srcAttr = vPart(e.srcId)
-        }
-        if (mapUsesDstAttr) {
-          et.dstAttr = vPart(e.dstId)
+    val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
+      case (pid, edgePartition) =>
+        // Choose scan method
+        val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+        val edgeIter = activeDirectionOpt match {
+          case Some(EdgeDirection.Both) =>
+            if (activeFraction < 0.8) {
+              edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+                .filter(e => edgePartition.isActive(e.dstId))
+            } else {
+              edgePartition.iterator.filter(e =>
+                edgePartition.isActive(e.srcId) && edgePartition.isActive(e.dstId))
+            }
+          case Some(EdgeDirection.Either) =>
+            // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
+            // the index here. Instead we have to scan all edges and then do the filter.
+            edgePartition.iterator.filter(e =>
+              edgePartition.isActive(e.srcId) || edgePartition.isActive(e.dstId))
+          case Some(EdgeDirection.Out) =>
+            if (activeFraction < 0.8) {
+              edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+            } else {
+              edgePartition.iterator.filter(e => edgePartition.isActive(e.srcId))
+            }
+          case Some(EdgeDirection.In) =>
+            edgePartition.iterator.filter(e => edgePartition.isActive(e.dstId))
+          case _ => // None
+            edgePartition.iterator
         }
-        mapFunc(et)
-      }
-      // Note: This doesn't allow users to send messages to arbitrary vertices.
-      vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
-    }
+
+        // Scan edges and run the map function
+        val mapOutputs = edgePartition.upgradeIterator(edgeIter, mapUsesSrcAttr, mapUsesDstAttr)
+          .flatMap(mapFunc(_))
+        // Note: This doesn't allow users to send messages to arbitrary vertices.
+        edgePartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
+    }).setName("GraphImpl.mapReduceTriplets - preAgg")
 
     // do the final reduction reusing the index map
     vertices.aggregateUsingIndex(preAgg, reduceFunc)
@@ -268,20 +224,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
       (other: RDD[(VertexId, U)])
-      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
-  {
+      (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
     if (classTag[VD] equals classTag[VD2]) {
+      vertices.cache()
       // updateF preserves type, so we can use incremental replication
-      val newVerts = vertices.leftJoin(other)(updateF)
+      val newVerts = vertices.leftJoin(other)(updateF).cache()
       val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
-      val newReplicatedVertexView = new ReplicatedVertexView[VD2](
-        changedVerts, edges, routingTable,
-        Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
-      new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+      val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+        .updateVertices(changedVerts)
+      new GraphImpl(newVerts, newReplicatedVertexView)
     } else {
       // updateF does not preserve type, so we must re-replicate all vertices
       val newVerts = vertices.leftJoin(other)(updateF)
-      GraphImpl(newVerts, edges, routingTable)
+      GraphImpl(newVerts, replicatedVertexView.edges)
     }
   }
 
@@ -298,73 +253,68 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
 object GraphImpl {
 
+  /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] =
-  {
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
   }
 
+  /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
   def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
-      edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
+      edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
       defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
   }
 
+  /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] =
-  {
-    val edgeRDD = createEdgeRDD(edges).cache()
-
-    // Get the set of all vids
-    val partitioner = Partitioner.defaultPartitioner(vertices)
-    val vPartitioned = vertices.partitionBy(partitioner)
-    val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
-    val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
-      vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
-    }
-
-    val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
-
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+    val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+    val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
     GraphImpl(vertexRDD, edgeRDD)
   }
 
+  /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
-    // Cache RDDs that are referenced multiple times
-    edges.cache()
-
-    GraphImpl(vertices, edges, new RoutingTable(edges, vertices))
+      edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
+    // Convert the vertex partitions in edges to the correct type
+    val newEdges = edges.mapEdgePartitions(
+      (pid, part) => part.withVertices(part.vertices.map(
+        (vid, attr) => null.asInstanceOf[VD])))
+    GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 
-  def apply[VD: ClassTag, ED: ClassTag](
+  /**
+   * Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
+   * vertices.
+   */
+  def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
       vertices: VertexRDD[VD],
-      edges: EdgeRDD[ED],
-      routingTable: RoutingTable): GraphImpl[VD, ED] = {
-    // Cache RDDs that are referenced multiple times. `routingTable` is cached by default, so we
-    // don't cache it explicitly.
-    vertices.cache()
-    edges.cache()
-
-    new GraphImpl(
-      vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
+      edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
+    new GraphImpl(vertices, new ReplicatedVertexView(edges))
   }
 
   /**
-   * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
-   * data structure (RDD[(VertexId, VertexId, ED)]).
-   *
-   * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
-   * pair: the key is the partition id, and the value is an EdgePartition object containing all the
-   * edges in a partition.
+   * Create a graph from an EdgeRDD with the correct vertex type, setting missing vertices to
+   * `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
    */
-  private def createEdgeRDD[ED: ClassTag](
-      edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+  private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
+      edges: EdgeRDD[ED, VD],
+      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+    edges.cache()
+    val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
+    fromExistingRDDs(vertices, edges)
+  }
+
+  /** Create an EdgeRDD from a set of edges. */
+  private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
+      edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
     val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED]
+      val builder = new EdgePartitionBuilder[ED, VD]
       iter.foreach { e =>
         builder.add(e.srcId, e.dstId, e.attr)
       }
@@ -373,24 +323,4 @@ object GraphImpl {
     new EdgeRDD(edgePartitions)
   }
 
-  private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
-      edges: EdgeRDD[ED],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    edges.cache()
-    // Get the set of all vids
-    val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
-    // Create the VertexRDD.
-    val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
-    GraphImpl(vertices, edges)
-  }
-
-  /** Collects all vids mentioned in edges and partitions them by partitioner. */
-  private def collectVertexIdsFromEdges(
-      edges: EdgeRDD[_],
-      partitioner: Partitioner): RDD[(VertexId, Int)] = {
-    // TODO: Consider doing map side distinct before shuffle.
-    new ShuffledRDD[VertexId, Int, (VertexId, Int)](
-      edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
-      .setSerializer(new VertexIdMsgSerializer)
-  }
 } // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index c45ba3d2f8c24417b0a6ac2aba1df19e2b040331..1c6d7e59e9a27ce7e91d52cece01e52a4261abaf 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -89,7 +89,6 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
 
 }
 
-
 private[graphx]
 object MsgRDDFunctions {
   implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
@@ -99,18 +98,28 @@ object MsgRDDFunctions {
   implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
     new VertexBroadcastMsgRDDFunctions(rdd)
   }
+}
 
-  def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
-    val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
+private[graphx]
+class VertexRDDFunctions[VD: ClassTag](self: RDD[(VertexId, VD)]) {
+  def copartitionWithVertices(partitioner: Partitioner): RDD[(VertexId, VD)] = {
+    val rdd = new ShuffledRDD[VertexId, VD, (VertexId, VD)](self, partitioner)
 
     // Set a custom serializer if the data is of int or double type.
-    if (classTag[T] == ClassTag.Int) {
+    if (classTag[VD] == ClassTag.Int) {
       rdd.setSerializer(new IntAggMsgSerializer)
-    } else if (classTag[T] == ClassTag.Long) {
+    } else if (classTag[VD] == ClassTag.Long) {
       rdd.setSerializer(new LongAggMsgSerializer)
-    } else if (classTag[T] == ClassTag.Double) {
+    } else if (classTag[VD] == ClassTag.Double) {
       rdd.setSerializer(new DoubleAggMsgSerializer)
     }
     rdd
   }
 }
+
+private[graphx]
+object VertexRDDFunctions {
+  implicit def rdd2VertexRDDFunctions[VD: ClassTag](rdd: RDD[(VertexId, VD)]) = {
+    new VertexRDDFunctions(rdd)
+  }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index a8154b63ce5fb6019bc1d6681dc0b56bce46ad7d..3a0bba1b93b418fbfec38c00f2e939399553b384 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -21,192 +21,102 @@ import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
 
 import org.apache.spark.graphx._
 
 /**
- * A view of the vertices after they are shipped to the join sites specified in
- * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
- * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
- * fresh view is created.
- *
- * The view is always cached (i.e., once it is evaluated, it remains materialized). This avoids
- * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
- * example. However, it means iterative algorithms must manually call `Graph.unpersist` on previous
- * iterations' graphs for best GC performance. See the implementation of
- * [[org.apache.spark.graphx.Pregel]] for an example.
+ * Manages shipping vertex attributes to the edge partitions of an
+ * [[org.apache.spark.graphx.EdgeRDD]]. Vertex attributes may be partially shipped to construct a
+ * triplet view with vertex attributes on only one side, and they may be updated. An active vertex
+ * set may additionally be shipped to the edge partitions. Be careful not to store a reference to
+ * `edges`, since it may be modified when the attribute shipping level is upgraded.
  */
 private[impl]
-class ReplicatedVertexView[VD: ClassTag](
-    updatedVerts: VertexRDD[VD],
-    edges: EdgeRDD[_],
-    routingTable: RoutingTable,
-    prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
+class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
+    var edges: EdgeRDD[ED, VD],
+    var hasSrcId: Boolean = false,
+    var hasDstId: Boolean = false) {
 
   /**
-   * Within each edge partition, create a local map from vid to an index into the attribute
-   * array. Each map contains a superset of the vertices that it will receive, because it stores
-   * vids from both the source and destination of edges. It must always include both source and
-   * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
+   * Return a new `ReplicatedVertexView` with the specified `EdgeRDD`, which must have the same
+   * shipping level.
    */
-  private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
-    case Some(prevView) =>
-      prevView.localVertexIdMap
-    case None =>
-      edges.partitionsRDD.mapPartitions(_.map {
-        case (pid, epart) =>
-          val vidToIndex = new VertexIdToIndexMap
-          epart.foreach { e =>
-            vidToIndex.add(e.srcId)
-            vidToIndex.add(e.dstId)
-          }
-          (pid, vidToIndex)
-      }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
-  }
-
-  private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
-  private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
-  private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
-  private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
-
-  def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = {
-    bothAttrs.unpersist(blocking)
-    srcAttrOnly.unpersist(blocking)
-    dstAttrOnly.unpersist(blocking)
-    noAttrs.unpersist(blocking)
-    // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
-    // without modification
-    this
+  def withEdges[VD2: ClassTag, ED2: ClassTag](
+      edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
+    new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
   }
 
-  def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
-    (includeSrc, includeDst) match {
-      case (true, true) => bothAttrs
-      case (true, false) => srcAttrOnly
-      case (false, true) => dstAttrOnly
-      case (false, false) => noAttrs
-    }
+  /**
+   * Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to
+   * match.
+   */
+  def reverse() = {
+    val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
+    new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
   }
 
-  def get(
-      includeSrc: Boolean,
-      includeDst: Boolean,
-      actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
-    // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
-    // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
-    // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
-    // also shipped there.
-    val shippedActives = routingTable.get(true, true)
-      .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
-      .partitionBy(edges.partitioner.get)
-    // Update the view with shippedActives, setting activeness flags in the resulting
-    // VertexPartitions
-    get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
-      val (pid, vPart) = viewIter.next()
-      val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
-      Iterator((pid, newPart))
+  /**
+   * Upgrade the shipping level in-place to the specified levels by shipping vertex attributes from
+   * `vertices`. This operation modifies the `ReplicatedVertexView`, and callers can access `edges`
+   * afterwards to obtain the upgraded view.
+   */
+  def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
+    val shipSrc = includeSrc && !hasSrcId
+    val shipDst = includeDst && !hasDstId
+    if (shipSrc || shipDst) {
+      val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
+        vertices.shipVertexAttributes(shipSrc, shipDst)
+          .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
+            includeSrc, includeDst, shipSrc, shipDst))
+          .partitionBy(edges.partitioner.get)
+      val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+        (ePartIter, shippedVertsIter) => ePartIter.map {
+          case (pid, edgePartition) =>
+            (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
+        }
+      })
+      edges = newEdges
+      hasSrcId = includeSrc
+      hasDstId = includeDst
     }
   }
 
-  private def create(includeSrc: Boolean, includeDst: Boolean)
-    : RDD[(PartitionID, VertexPartition[VD])] = {
-    val vdTag = classTag[VD]
-
-    // Ship vertex attributes to edge partitions according to vertexPlacement
-    val verts = updatedVerts.partitionsRDD
-    val shippedVerts = routingTable.get(includeSrc, includeDst)
-      .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
+  /**
+   * Return a new `ReplicatedVertexView` where the `activeSet` in each edge partition contains only
+   * vertex ids present in `actives`. This ships a vertex id to all edge partitions where it is
+   * referenced, ignoring the attribute shipping level.
+   */
+  def withActiveSet(actives: VertexRDD[_]): ReplicatedVertexView[VD, ED] = {
+    val shippedActives = actives.shipVertexIds()
+      .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
       .partitionBy(edges.partitioner.get)
-    // TODO: Consider using a specialized shuffler.
-
-    prevViewOpt match {
-      case Some(prevView) =>
-        // Update prevView with shippedVerts, setting staleness flags in the resulting
-        // VertexPartitions
-        prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
-          (prevViewIter, shippedVertsIter) =>
-            val (pid, prevVPart) = prevViewIter.next()
-            val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
-            Iterator((pid, newVPart))
-        }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
 
-      case None =>
-        // Within each edge partition, place the shipped vertex attributes into the correct
-        // locations specified in localVertexIdMap
-        localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
-          val (pid, vidToIndex) = mapIter.next()
-          assert(!mapIter.hasNext)
-          // Populate the vertex array using the vidToIndex map
-          val vertexArray = vdTag.newArray(vidToIndex.capacity)
-          for ((_, block) <- shippedVertsIter) {
-            for (i <- 0 until block.vids.size) {
-              val vid = block.vids(i)
-              val attr = block.attrs(i)
-              val ind = vidToIndex.getPos(vid)
-              vertexArray(ind) = attr
-            }
-          }
-          val newVPart = new VertexPartition(
-            vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
-          Iterator((pid, newVPart))
-        }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
-    }
-  }
-}
-
-private object ReplicatedVertexView {
-  protected def buildBuffer[VD: ClassTag](
-      pid2vidIter: Iterator[Array[Array[VertexId]]],
-      vertexPartIter: Iterator[VertexPartition[VD]]) = {
-    val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
-    val vertexPart: VertexPartition[VD] = vertexPartIter.next()
-
-    Iterator.tabulate(pid2vid.size) { pid =>
-      val vidsCandidate = pid2vid(pid)
-      val size = vidsCandidate.length
-      val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
-      val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
-      var i = 0
-      while (i < size) {
-        val vid = vidsCandidate(i)
-        if (vertexPart.isDefined(vid)) {
-          vids += vid
-          attrs += vertexPart(vid)
-        }
-        i += 1
+    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+      (ePartIter, shippedActivesIter) => ePartIter.map {
+        case (pid, edgePartition) =>
+          (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
       }
-      (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
-    }
+    })
+    new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
   }
 
-  protected def buildActiveBuffer(
-      pid2vidIter: Iterator[Array[Array[VertexId]]],
-      activePartIter: Iterator[VertexPartition[_]])
-    : Iterator[(Int, Array[VertexId])] = {
-    val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
-    val activePart: VertexPartition[_] = activePartIter.next()
+  /**
+   * Return a new `ReplicatedVertexView` where vertex attributes in edge partition are updated using
+   * `updates`. This ships a vertex attribute only to the edge partitions where it is in the
+   * position(s) specified by the attribute shipping level.
+   */
+  def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
+    val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
+      .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
+        hasSrcId, hasDstId))
+      .partitionBy(edges.partitioner.get)
 
-    Iterator.tabulate(pid2vid.size) { pid =>
-      val vidsCandidate = pid2vid(pid)
-      val size = vidsCandidate.length
-      val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
-      var i = 0
-      while (i < size) {
-        val vid = vidsCandidate(i)
-        if (activePart.isDefined(vid)) {
-          actives += vid
-        }
-        i += 1
+    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+      (ePartIter, shippedVertsIter) => ePartIter.map {
+        case (pid, edgePartition) =>
+          (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
       }
-      (pid, actives.trim().array)
-    }
+    })
+    new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
   }
 }
-
-private[graphx]
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
-  extends Serializable {
-  def iterator: Iterator[(VertexId, VD)] =
-    (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
-}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
deleted file mode 100644
index 022d5668e29428fc9ff142cf68131ac520051c72..0000000000000000000000000000000000000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.impl
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.graphx._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.collection.PrimitiveVector
-
-/**
- * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing
- * information for shipping vertex attributes to edge partitions. This is always cached because it
- * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and
- * (possibly) once to ship the active-set information.
- */
-private[impl]
-class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
-
-  val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
-  val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
-  val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
-  val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
-
-  def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
-    (includeSrcAttr, includeDstAttr) match {
-      case (true, true) => bothAttrs
-      case (true, false) => srcAttrOnly
-      case (false, true) => dstAttrOnly
-      case (false, false) => noAttrs
-    }
-
-  private def createPid2Vid(
-      includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
-    // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
-    val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
-      val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
-      val numEdges = edgePartition.size
-      val vSet = new VertexSet
-      if (includeSrcAttr) {  // Add src vertices to the set.
-        var i = 0
-        while (i < numEdges) {
-          vSet.add(edgePartition.srcIds(i))
-          i += 1
-        }
-      }
-      if (includeDstAttr) {  // Add dst vertices to the set.
-      var i = 0
-        while (i < numEdges) {
-          vSet.add(edgePartition.dstIds(i))
-          i += 1
-        }
-      }
-      vSet.iterator.map { vid => (vid, pid) }
-    }
-
-    val numEdgePartitions = edges.partitions.size
-    vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
-      val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
-      for ((vid, pid) <- iter) {
-        pid2vid(pid) += vid
-      }
-
-      Iterator(pid2vid.map(_.trim().array))
-    }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr))
-  }
-}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
new file mode 100644
index 0000000000000000000000000000000000000000..927e32ad0f4487e4e8e3e11a0ae7f9a1be3b45a1
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+    var vid: VertexId,
+    var pid: PartitionID,
+    var position: Byte)
+  extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+  override def _1 = vid
+  override def _2 = (pid, position)
+  override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+  /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
+  def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
+    new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
+      .setSerializer(new RoutingTableMessageSerializer)
+  }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+  import scala.language.implicitConversions
+
+  implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
+    new RoutingTableMessageRDDFunctions(rdd)
+  }
+}
+
+private[graphx]
+object RoutingTablePartition {
+  val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+  /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
+  def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
+    : Iterator[RoutingTableMessage] = {
+    // Determine which positions each vertex id appears in using a map where the low 2 bits
+    // represent src and dst
+    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+    edgePartition.srcIds.iterator.foreach { srcId =>
+      map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
+    }
+    edgePartition.dstIds.iterator.foreach { dstId =>
+      map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
+    }
+    map.iterator.map { vidAndPosition =>
+      new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+    }
+  }
+
+  /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+  def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+    : RoutingTablePartition = {
+    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
+    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
+    val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
+    for (msg <- iter) {
+      pid2vid(msg.pid) += msg.vid
+      srcFlags(msg.pid) += (msg.position & 0x1) != 0
+      dstFlags(msg.pid) += (msg.position & 0x2) != 0
+    }
+
+    new RoutingTablePartition(pid2vid.zipWithIndex.map {
+      case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
+    })
+  }
+
+  /** Compact the given vector of Booleans into a BitSet. */
+  private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
+    val bitset = new BitSet(flags.size)
+    var i = 0
+    while (i < flags.size) {
+      if (flags(i)) {
+        bitset.set(i)
+      }
+      i += 1
+    }
+    bitset
+  }
+}
+
+/**
+ * Stores the locations of edge-partition join sites for each vertex attribute in a particular
+ * vertex partition. This provides routing information for shipping vertex attributes to edge
+ * partitions.
+ */
+private[graphx]
+class RoutingTablePartition(
+    private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
+  /** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
+  val numEdgePartitions: Int = routingTable.size
+
+  /** Returns the number of vertices that will be sent to the specified edge partition. */
+  def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size
+
+  /** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */
+  def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator)
+
+  /** Returns a new RoutingTablePartition reflecting a reversal of all edge directions. */
+  def reverse: RoutingTablePartition = {
+    new RoutingTablePartition(routingTable.map {
+      case (vids, srcVids, dstVids) => (vids, dstVids, srcVids)
+    })
+  }
+
+  /**
+   * Runs `f` on each vertex id to be sent to the specified edge partition. Vertex ids can be
+   * filtered by the position they have in the edge partition.
+   */
+  def foreachWithinEdgePartition
+      (pid: PartitionID, includeSrc: Boolean, includeDst: Boolean)
+      (f: VertexId => Unit) {
+    val (vidsCandidate, srcVids, dstVids) = routingTable(pid)
+    val size = vidsCandidate.length
+    if (includeSrc && includeDst) {
+      // Avoid checks for performance
+      vidsCandidate.iterator.foreach(f)
+    } else if (!includeSrc && !includeDst) {
+      // Do nothing
+    } else {
+      val relevantVids = if (includeSrc) srcVids else dstVids
+      relevantVids.iterator.foreach { i => f(vidsCandidate(i)) }
+    }
+  }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 1de42eeca1f0027fdfe8a72d6772e583dd6d6a7f..033237f5972167418d4be96534ad101fea8712ad 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -27,6 +27,35 @@ import scala.reflect.ClassTag
 import org.apache.spark.graphx._
 import org.apache.spark.serializer._
 
+private[graphx]
+class RoutingTableMessageSerializer extends Serializer with Serializable {
+  override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+    override def serializeStream(s: OutputStream): SerializationStream =
+      new ShuffleSerializationStream(s) {
+        def writeObject[T: ClassTag](t: T): SerializationStream = {
+          val msg = t.asInstanceOf[RoutingTableMessage]
+          writeVarLong(msg.vid, optimizePositive = false)
+          writeUnsignedVarInt(msg.pid)
+          // TODO: Write only the bottom two bits of msg.position
+          s.write(msg.position)
+          this
+        }
+      }
+
+    override def deserializeStream(s: InputStream): DeserializationStream =
+      new ShuffleDeserializationStream(s) {
+        override def readObject[T: ClassTag](): T = {
+          val a = readVarLong(optimizePositive = false)
+          val b = readUnsignedVarInt()
+          val c = s.read()
+          if (c == -1) throw new EOFException
+          new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T]
+        }
+      }
+  }
+}
+
 private[graphx]
 class VertexIdMsgSerializer extends Serializer with Serializable {
   override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f4e221d4e05ae4e0813722003834e4f0d62a1fcd
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/** Stores vertex attributes to ship to an edge partition. */
+private[graphx]
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
+  extends Serializable {
+  def iterator: Iterator[(VertexId, VD)] =
+    (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+}
+
+private[graphx]
+object ShippableVertexPartition {
+  /** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
+  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
+    apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
+
+  /**
+   * Construct a `ShippableVertexPartition` from the given vertices with the specified routing
+   * table, filling in missing vertices mentioned in the routing table using `defaultVal`.
+   */
+  def apply[VD: ClassTag](
+      iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
+    : ShippableVertexPartition[VD] = {
+    val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
+    val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
+    new ShippableVertexPartition(index, values, mask, routingTable)
+  }
+
+  import scala.language.implicitConversions
+
+  /**
+   * Implicit conversion to allow invoking `VertexPartitionBase` operations directly on a
+   * `ShippableVertexPartition`.
+   */
+  implicit def shippablePartitionToOps[VD: ClassTag](partition: ShippableVertexPartition[VD]) =
+    new ShippableVertexPartitionOps(partition)
+
+  /**
+   * Implicit evidence that `ShippableVertexPartition` is a member of the
+   * `VertexPartitionBaseOpsConstructor` typeclass. This enables invoking `VertexPartitionBase`
+   * operations on a `ShippableVertexPartition` via an evidence parameter, as in
+   * [[VertexPartitionBaseOps]].
+   */
+  implicit object ShippableVertexPartitionOpsConstructor
+    extends VertexPartitionBaseOpsConstructor[ShippableVertexPartition] {
+    def toOps[VD: ClassTag](partition: ShippableVertexPartition[VD])
+      : VertexPartitionBaseOps[VD, ShippableVertexPartition] = shippablePartitionToOps(partition)
+  }
+}
+
+/**
+ * A map from vertex id to vertex attribute that additionally stores edge partition join sites for
+ * each vertex attribute, enabling joining with an [[org.apache.spark.graphx.EdgeRDD]].
+ */
+private[graphx]
+class ShippableVertexPartition[VD: ClassTag](
+    val index: VertexIdToIndexMap,
+    val values: Array[VD],
+    val mask: BitSet,
+    val routingTable: RoutingTablePartition)
+  extends VertexPartitionBase[VD] {
+
+  /** Return a new ShippableVertexPartition with the specified routing table. */
+  def withRoutingTable(routingTable_ : RoutingTablePartition): ShippableVertexPartition[VD] = {
+    new ShippableVertexPartition(index, values, mask, routingTable_)
+  }
+
+  /**
+   * Generate a `VertexAttributeBlock` for each edge partition keyed on the edge partition ID. The
+   * `VertexAttributeBlock` contains the vertex attributes from the current partition that are
+   * referenced in the specified positions in the edge partition.
+   */
+  def shipVertexAttributes(
+      shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = {
+    Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
+      val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
+      val vids = new PrimitiveVector[VertexId](initialSize)
+      val attrs = new PrimitiveVector[VD](initialSize)
+      var i = 0
+      routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
+        if (isDefined(vid)) {
+          vids += vid
+          attrs += this(vid)
+        }
+        i += 1
+      }
+      (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
+    }
+  }
+
+  /**
+   * Generate a `VertexId` array for each edge partition keyed on the edge partition ID. The array
+   * contains the visible vertex ids from the current partition that are referenced in the edge
+   * partition.
+   */
+  def shipVertexIds(): Iterator[(PartitionID, Array[VertexId])] = {
+    Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
+      val vids = new PrimitiveVector[VertexId](routingTable.partitionSize(pid))
+      var i = 0
+      routingTable.foreachWithinEdgePartition(pid, true, true) { vid =>
+        if (isDefined(vid)) {
+          vids += vid
+        }
+        i += 1
+      }
+      (pid, vids.trim().array)
+    }
+  }
+}
+
+private[graphx] class ShippableVertexPartitionOps[VD: ClassTag](self: ShippableVertexPartition[VD])
+  extends VertexPartitionBaseOps[VD, ShippableVertexPartition](self) {
+
+  def withIndex(index: VertexIdToIndexMap): ShippableVertexPartition[VD] = {
+    new ShippableVertexPartition(index, self.values, self.mask, self.routingTable)
+  }
+
+  def withValues[VD2: ClassTag](values: Array[VD2]): ShippableVertexPartition[VD2] = {
+    new ShippableVertexPartition(self.index, values, self.mask, self.routingTable)
+  }
+
+  def withMask(mask: BitSet): ShippableVertexPartition[VD] = {
+    new ShippableVertexPartition(self.index, self.values, mask, self.routingTable)
+  }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 7a54b413dc8ca26499c6ce318256b2a72744e402..f1d174720a1baa4fe1a5cb2743121f184932d610 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -19,260 +19,59 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.BitSet
 
 private[graphx] object VertexPartition {
-
-  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
-    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
-    iter.foreach { case (k, v) =>
-      map(k) = v
-    }
-    new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
-  }
-
-  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
-    : VertexPartition[VD] =
-  {
-    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
-    iter.foreach { case (k, v) =>
-      map.setMerge(k, v, mergeFunc)
-    }
-    new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
-  }
-}
-
-
-private[graphx]
-class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
-    val index: VertexIdToIndexMap,
-    val values: Array[VD],
-    val mask: BitSet,
-    /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */
-    private val activeSet: Option[VertexSet] = None)
-  extends Logging {
-
-  val capacity: Int = index.capacity
-
-  def size: Int = mask.cardinality()
-
-  /** Return the vertex attribute for the given vertex ID. */
-  def apply(vid: VertexId): VD = values(index.getPos(vid))
-
-  def isDefined(vid: VertexId): Boolean = {
-    val pos = index.getPos(vid)
-    pos >= 0 && mask.get(pos)
-  }
-
-  /** Look up vid in activeSet, throwing an exception if it is None. */
-  def isActive(vid: VertexId): Boolean = {
-    activeSet.get.contains(vid)
+  /** Construct a `VertexPartition` from the given vertices. */
+  def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+    : VertexPartition[VD] = {
+    val (index, values, mask) = VertexPartitionBase.initFrom(iter)
+    new VertexPartition(index, values, mask)
   }
 
-  /** The number of active vertices, if any exist. */
-  def numActives: Option[Int] = activeSet.map(_.size)
+  import scala.language.implicitConversions
 
   /**
-   * Pass each vertex attribute along with the vertex id through a map
-   * function and retain the original RDD's partitioning and index.
-   *
-   * @tparam VD2 the type returned by the map function
-   *
-   * @param f the function applied to each vertex id and vertex
-   * attribute in the RDD
-   *
-   * @return a new VertexPartition with values obtained by applying `f` to
-   * each of the entries in the original VertexRDD.  The resulting
-   * VertexPartition retains the same index.
+   * Implicit conversion to allow invoking `VertexPartitionBase` operations directly on a
+   * `VertexPartition`.
    */
-  def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
-    // Construct a view of the map transformation
-    val newValues = new Array[VD2](capacity)
-    var i = mask.nextSetBit(0)
-    while (i >= 0) {
-      newValues(i) = f(index.getValue(i), values(i))
-      i = mask.nextSetBit(i + 1)
-    }
-    new VertexPartition[VD2](index, newValues, mask)
-  }
-
-  /**
-   * Restrict the vertex set to the set of vertices satisfying the given predicate.
-   *
-   * @param pred the user defined predicate
-   *
-   * @note The vertex set preserves the original index structure which means that the returned
-   *       RDD can be easily joined with the original vertex-set. Furthermore, the filter only
-   *       modifies the bitmap index and so no new values are allocated.
-   */
-  def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
-    // Allocate the array to store the results into
-    val newMask = new BitSet(capacity)
-    // Iterate over the active bits in the old mask and evaluate the predicate
-    var i = mask.nextSetBit(0)
-    while (i >= 0) {
-      if (pred(index.getValue(i), values(i))) {
-        newMask.set(i)
-      }
-      i = mask.nextSetBit(i + 1)
-    }
-    new VertexPartition(index, values, newMask)
-  }
+  implicit def partitionToOps[VD: ClassTag](partition: VertexPartition[VD]) =
+    new VertexPartitionOps(partition)
 
   /**
-   * Hides vertices that are the same between this and other. For vertices that are different, keeps
-   * the values from `other`. The indices of `this` and `other` must be the same.
+   * Implicit evidence that `VertexPartition` is a member of the `VertexPartitionBaseOpsConstructor`
+   * typeclass. This enables invoking `VertexPartitionBase` operations on a `VertexPartition` via an
+   * evidence parameter, as in [[VertexPartitionBaseOps]].
    */
-  def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
-    if (index != other.index) {
-      logWarning("Diffing two VertexPartitions with different indexes is slow.")
-      diff(createUsingIndex(other.iterator))
-    } else {
-      val newMask = mask & other.mask
-      var i = newMask.nextSetBit(0)
-      while (i >= 0) {
-        if (values(i) == other.values(i)) {
-          newMask.unset(i)
-        }
-        i = newMask.nextSetBit(i + 1)
-      }
-      new VertexPartition(index, other.values, newMask)
-    }
-  }
-
-  /** Left outer join another VertexPartition. */
-  def leftJoin[VD2: ClassTag, VD3: ClassTag]
-      (other: VertexPartition[VD2])
-      (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
-    if (index != other.index) {
-      logWarning("Joining two VertexPartitions with different indexes is slow.")
-      leftJoin(createUsingIndex(other.iterator))(f)
-    } else {
-      val newValues = new Array[VD3](capacity)
-
-      var i = mask.nextSetBit(0)
-      while (i >= 0) {
-        val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
-        newValues(i) = f(index.getValue(i), values(i), otherV)
-        i = mask.nextSetBit(i + 1)
-      }
-      new VertexPartition(index, newValues, mask)
-    }
-  }
-
-  /** Left outer join another iterator of messages. */
-  def leftJoin[VD2: ClassTag, VD3: ClassTag]
-      (other: Iterator[(VertexId, VD2)])
-      (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
-    leftJoin(createUsingIndex(other))(f)
-  }
-
-  /** Inner join another VertexPartition. */
-  def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
-      (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
-    if (index != other.index) {
-      logWarning("Joining two VertexPartitions with different indexes is slow.")
-      innerJoin(createUsingIndex(other.iterator))(f)
-    } else {
-      val newMask = mask & other.mask
-      val newValues = new Array[VD2](capacity)
-      var i = newMask.nextSetBit(0)
-      while (i >= 0) {
-        newValues(i) = f(index.getValue(i), values(i), other.values(i))
-        i = newMask.nextSetBit(i + 1)
-      }
-      new VertexPartition(index, newValues, newMask)
-    }
-  }
-
-  /**
-   * Inner join an iterator of messages.
-   */
-  def innerJoin[U: ClassTag, VD2: ClassTag]
-      (iter: Iterator[Product2[VertexId, U]])
-      (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
-    innerJoin(createUsingIndex(iter))(f)
+  implicit object VertexPartitionOpsConstructor
+    extends VertexPartitionBaseOpsConstructor[VertexPartition] {
+    def toOps[VD: ClassTag](partition: VertexPartition[VD])
+      : VertexPartitionBaseOps[VD, VertexPartition] = partitionToOps(partition)
   }
+}
 
-  /**
-   * Similar effect as aggregateUsingIndex((a, b) => a)
-   */
-  def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
-    : VertexPartition[VD2] = {
-    val newMask = new BitSet(capacity)
-    val newValues = new Array[VD2](capacity)
-    iter.foreach { case (vid, vdata) =>
-      val pos = index.getPos(vid)
-      if (pos >= 0) {
-        newMask.set(pos)
-        newValues(pos) = vdata
-      }
-    }
-    new VertexPartition[VD2](index, newValues, newMask)
-  }
+/** A map from vertex id to vertex attribute. */
+private[graphx] class VertexPartition[VD: ClassTag](
+    val index: VertexIdToIndexMap,
+    val values: Array[VD],
+    val mask: BitSet)
+  extends VertexPartitionBase[VD]
 
-  /**
-   * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
-   * the partition, hidden by the bitmask.
-   */
-  def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
-    val newMask = new BitSet(capacity)
-    val newValues = new Array[VD](capacity)
-    System.arraycopy(values, 0, newValues, 0, newValues.length)
-    iter.foreach { case (vid, vdata) =>
-      val pos = index.getPos(vid)
-      if (pos >= 0) {
-        newMask.set(pos)
-        newValues(pos) = vdata
-      }
-    }
-    new VertexPartition(index, newValues, newMask)
-  }
+private[graphx] class VertexPartitionOps[VD: ClassTag](self: VertexPartition[VD])
+  extends VertexPartitionBaseOps[VD, VertexPartition](self) {
 
-  def aggregateUsingIndex[VD2: ClassTag](
-      iter: Iterator[Product2[VertexId, VD2]],
-      reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
-    val newMask = new BitSet(capacity)
-    val newValues = new Array[VD2](capacity)
-    iter.foreach { product =>
-      val vid = product._1
-      val vdata = product._2
-      val pos = index.getPos(vid)
-      if (pos >= 0) {
-        if (newMask.get(pos)) {
-          newValues(pos) = reduceFunc(newValues(pos), vdata)
-        } else { // otherwise just store the new value
-          newMask.set(pos)
-          newValues(pos) = vdata
-        }
-      }
-    }
-    new VertexPartition[VD2](index, newValues, newMask)
+  def withIndex(index: VertexIdToIndexMap): VertexPartition[VD] = {
+    new VertexPartition(index, self.values, self.mask)
   }
 
-  def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
-    val newActiveSet = new VertexSet
-    iter.foreach(newActiveSet.add(_))
-    new VertexPartition(index, values, mask, Some(newActiveSet))
+  def withValues[VD2: ClassTag](values: Array[VD2]): VertexPartition[VD2] = {
+    new VertexPartition(self.index, values, self.mask)
   }
 
-  /**
-   * Construct a new VertexPartition whose index contains only the vertices in the mask.
-   */
-  def reindex(): VertexPartition[VD] = {
-    val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
-    val arbitraryMerge = (a: VD, b: VD) => a
-    for ((k, v) <- this.iterator) {
-      hashMap.setMerge(k, v, arbitraryMerge)
-    }
-    new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
+  def withMask(mask: BitSet): VertexPartition[VD] = {
+    new VertexPartition(self.index, self.values, mask)
   }
-
-  def iterator: Iterator[(VertexId, VD)] =
-    mask.iterator.map(ind => (index.getValue(ind), values(ind)))
-
-  def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
 }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8d9e0204d27f24b64a22c30560bc6a2707035624
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+private[graphx] object VertexPartitionBase {
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
+   * entries arbitrarily.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+    : (VertexIdToIndexMap, Array[VD], BitSet) = {
+    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    iter.foreach { pair =>
+      map(pair._1) = pair._2
+    }
+    (map.keySet, map._values, map.keySet.getBitSet)
+  }
+
+  /**
+   * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
+   * entries using `mergeFunc`.
+   */
+  def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
+    : (VertexIdToIndexMap, Array[VD], BitSet) = {
+    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    iter.foreach { pair =>
+      map.setMerge(pair._1, pair._2, mergeFunc)
+    }
+    (map.keySet, map._values, map.keySet.getBitSet)
+  }
+}
+
+/**
+ * An abstract map from vertex id to vertex attribute. [[VertexPartition]] is the corresponding
+ * concrete implementation. [[VertexPartitionBaseOps]] provides a variety of operations for
+ * VertexPartitionBase and subclasses that provide implicit evidence of membership in the
+ * `VertexPartitionBaseOpsConstructor` typeclass (for example,
+ * [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag] {
+
+  def index: VertexIdToIndexMap
+  def values: Array[VD]
+  def mask: BitSet
+
+  val capacity: Int = index.capacity
+
+  def size: Int = mask.cardinality()
+
+  /** Return the vertex attribute for the given vertex ID. */
+  def apply(vid: VertexId): VD = values(index.getPos(vid))
+
+  def isDefined(vid: VertexId): Boolean = {
+    val pos = index.getPos(vid)
+    pos >= 0 && mask.get(pos)
+  }
+
+  def iterator: Iterator[(VertexId, VD)] =
+    mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+}
+
+/**
+ * A typeclass for subclasses of `VertexPartitionBase` representing the ability to wrap them in a
+ * `VertexPartitionBaseOps`.
+ */
+private[graphx] trait VertexPartitionBaseOpsConstructor[T[X] <: VertexPartitionBase[X]] {
+  def toOps[VD: ClassTag](partition: T[VD]): VertexPartitionBaseOps[VD, T]
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
new file mode 100644
index 0000000000000000000000000000000000000000..21ff615feca6c74c2a7d26e8915246bb15acef30
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * An class containing additional operations for subclasses of VertexPartitionBase that provide
+ * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
+ * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBaseOps
+    [VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
+    (self: Self[VD])
+    extends Logging {
+
+  def withIndex(index: VertexIdToIndexMap): Self[VD]
+  def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
+  def withMask(mask: BitSet): Self[VD]
+
+  /**
+   * Pass each vertex attribute along with the vertex id through a map
+   * function and retain the original RDD's partitioning and index.
+   *
+   * @tparam VD2 the type returned by the map function
+   *
+   * @param f the function applied to each vertex id and vertex
+   * attribute in the RDD
+   *
+   * @return a new VertexPartition with values obtained by applying `f` to
+   * each of the entries in the original VertexRDD.  The resulting
+   * VertexPartition retains the same index.
+   */
+  def map[VD2: ClassTag](f: (VertexId, VD) => VD2): Self[VD2] = {
+    // Construct a view of the map transformation
+    val newValues = new Array[VD2](self.capacity)
+    var i = self.mask.nextSetBit(0)
+    while (i >= 0) {
+      newValues(i) = f(self.index.getValue(i), self.values(i))
+      i = self.mask.nextSetBit(i + 1)
+    }
+    this.withValues(newValues)
+  }
+
+  /**
+   * Restrict the vertex set to the set of vertices satisfying the given predicate.
+   *
+   * @param pred the user defined predicate
+   *
+   * @note The vertex set preserves the original index structure which means that the returned
+   *       RDD can be easily joined with the original vertex-set. Furthermore, the filter only
+   *       modifies the bitmap index and so no new values are allocated.
+   */
+  def filter(pred: (VertexId, VD) => Boolean): Self[VD] = {
+    // Allocate the array to store the results into
+    val newMask = new BitSet(self.capacity)
+    // Iterate over the active bits in the old mask and evaluate the predicate
+    var i = self.mask.nextSetBit(0)
+    while (i >= 0) {
+      if (pred(self.index.getValue(i), self.values(i))) {
+        newMask.set(i)
+      }
+      i = self.mask.nextSetBit(i + 1)
+    }
+    this.withMask(newMask)
+  }
+
+  /**
+   * Hides vertices that are the same between this and other. For vertices that are different, keeps
+   * the values from `other`. The indices of `this` and `other` must be the same.
+   */
+  def diff(other: Self[VD]): Self[VD] = {
+    if (self.index != other.index) {
+      logWarning("Diffing two VertexPartitions with different indexes is slow.")
+      diff(createUsingIndex(other.iterator))
+    } else {
+      val newMask = self.mask & other.mask
+      var i = newMask.nextSetBit(0)
+      while (i >= 0) {
+        if (self.values(i) == other.values(i)) {
+          newMask.unset(i)
+        }
+        i = newMask.nextSetBit(i + 1)
+      }
+      this.withValues(other.values).withMask(newMask)
+    }
+  }
+
+  /** Left outer join another VertexPartition. */
+  def leftJoin[VD2: ClassTag, VD3: ClassTag]
+      (other: Self[VD2])
+      (f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
+    if (self.index != other.index) {
+      logWarning("Joining two VertexPartitions with different indexes is slow.")
+      leftJoin(createUsingIndex(other.iterator))(f)
+    } else {
+      val newValues = new Array[VD3](self.capacity)
+
+      var i = self.mask.nextSetBit(0)
+      while (i >= 0) {
+        val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
+        newValues(i) = f(self.index.getValue(i), self.values(i), otherV)
+        i = self.mask.nextSetBit(i + 1)
+      }
+      this.withValues(newValues)
+    }
+  }
+
+  /** Left outer join another iterator of messages. */
+  def leftJoin[VD2: ClassTag, VD3: ClassTag]
+      (other: Iterator[(VertexId, VD2)])
+      (f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
+    leftJoin(createUsingIndex(other))(f)
+  }
+
+  /** Inner join another VertexPartition. */
+  def innerJoin[U: ClassTag, VD2: ClassTag]
+      (other: Self[U])
+      (f: (VertexId, VD, U) => VD2): Self[VD2] = {
+    if (self.index != other.index) {
+      logWarning("Joining two VertexPartitions with different indexes is slow.")
+      innerJoin(createUsingIndex(other.iterator))(f)
+    } else {
+      val newMask = self.mask & other.mask
+      val newValues = new Array[VD2](self.capacity)
+      var i = newMask.nextSetBit(0)
+      while (i >= 0) {
+        newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
+        i = newMask.nextSetBit(i + 1)
+      }
+      this.withValues(newValues).withMask(newMask)
+    }
+  }
+
+  /**
+   * Inner join an iterator of messages.
+   */
+  def innerJoin[U: ClassTag, VD2: ClassTag]
+      (iter: Iterator[Product2[VertexId, U]])
+      (f: (VertexId, VD, U) => VD2): Self[VD2] = {
+    innerJoin(createUsingIndex(iter))(f)
+  }
+
+  /**
+   * Similar effect as aggregateUsingIndex((a, b) => a)
+   */
+  def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
+    : Self[VD2] = {
+    val newMask = new BitSet(self.capacity)
+    val newValues = new Array[VD2](self.capacity)
+    iter.foreach { pair =>
+      val pos = self.index.getPos(pair._1)
+      if (pos >= 0) {
+        newMask.set(pos)
+        newValues(pos) = pair._2
+      }
+    }
+    this.withValues(newValues).withMask(newMask)
+  }
+
+  /**
+   * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
+   * the partition, hidden by the bitmask.
+   */
+  def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): Self[VD] = {
+    val newMask = new BitSet(self.capacity)
+    val newValues = new Array[VD](self.capacity)
+    System.arraycopy(self.values, 0, newValues, 0, newValues.length)
+    iter.foreach { pair =>
+      val pos = self.index.getPos(pair._1)
+      if (pos >= 0) {
+        newMask.set(pos)
+        newValues(pos) = pair._2
+      }
+    }
+    this.withValues(newValues).withMask(newMask)
+  }
+
+  def aggregateUsingIndex[VD2: ClassTag](
+      iter: Iterator[Product2[VertexId, VD2]],
+      reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
+    val newMask = new BitSet(self.capacity)
+    val newValues = new Array[VD2](self.capacity)
+    iter.foreach { product =>
+      val vid = product._1
+      val vdata = product._2
+      val pos = self.index.getPos(vid)
+      if (pos >= 0) {
+        if (newMask.get(pos)) {
+          newValues(pos) = reduceFunc(newValues(pos), vdata)
+        } else { // otherwise just store the new value
+          newMask.set(pos)
+          newValues(pos) = vdata
+        }
+      }
+    }
+    this.withValues(newValues).withMask(newMask)
+  }
+
+  /**
+   * Construct a new VertexPartition whose index contains only the vertices in the mask.
+   */
+  def reindex(): Self[VD] = {
+    val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    val arbitraryMerge = (a: VD, b: VD) => a
+    for ((k, v) <- self.iterator) {
+      hashMap.setMerge(k, v, arbitraryMerge)
+    }
+    this.withIndex(hashMap.keySet).withValues(hashMap._values).withMask(hashMap.keySet.getBitSet)
+  }
+
+  /**
+   * Converts a vertex partition (in particular, one of type `Self`) into a
+   * `VertexPartitionBaseOps`. Within this class, this allows chaining the methods defined above,
+   * because these methods return a `Self` and this implicit conversion re-wraps that in a
+   * `VertexPartitionBaseOps`. This relies on the context bound on `Self`.
+   */
+  private implicit def toOps[VD2: ClassTag](
+      partition: Self[VD2]): VertexPartitionBaseOps[VD2, Self] = {
+    implicitly[VertexPartitionBaseOpsConstructor[Self]].toOps(partition)
+  }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index d901d4fe225fe4a8f7e3476331c34ee94c69731f..069e042ed94a3e11cf31a9b8540680c290579758 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -55,6 +55,7 @@ object Analytics extends Logging {
     val conf = new SparkConf()
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+      .set("spark.locality.wait", "100000")
 
     taskType match {
       case "pagerank" =>
@@ -62,12 +63,14 @@ object Analytics extends Logging {
         var outFname = ""
         var numEPart = 4
         var partitionStrategy: Option[PartitionStrategy] = None
+        var numIterOpt: Option[Int] = None
 
         options.foreach{
           case ("tol", v) => tol = v.toFloat
           case ("output", v) => outFname = v
           case ("numEPart", v) => numEPart = v.toInt
           case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+          case ("numIter", v) => numIterOpt = Some(v.toInt)
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
 
@@ -84,7 +87,10 @@ object Analytics extends Logging {
         println("GRAPHX: Number of vertices " + graph.vertices.count)
         println("GRAPHX: Number of edges " + graph.edges.count)
 
-        val pr = graph.pageRank(tol).vertices.cache()
+        val pr = (numIterOpt match {
+          case Some(numIter) => PageRank.run(graph, numIter)
+          case None => PageRank.runUntilConvergence(graph, tol)
+        }).vertices.cache()
 
         println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
 
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 32b5fe4813594cc448ad570a10877bdb2ae05bdc..7b9bac5d9c8ea22c0a6c62fd49c7ce9544922c75 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val p = 100
       val verts = 1 to n
       val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
-        verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
+        verts.withFilter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
       assert(graph.edges.partitions.length === p)
       val partitionedGraph = graph.partitionBy(EdgePartition2D)
       assert(graph.edges.partitions.length === p)
@@ -120,7 +120,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
         val part = iter.next()._2
         Iterator((part.srcIds ++ part.dstIds).toSet)
       }.collect
-      assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+      if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) {
+        val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound)
+        val failure = verts.maxBy(id => partitionSets.count(_.contains(id)))
+        fail(("Replication bound test failed for %d/%d vertices. " +
+          "Example: vertex %d replicated to %d (> %f) partitions.").format(
+          numFailures, n, failure, partitionSets.count(_.contains(failure)), bound))
+      }
       // This should not be true for the default hash partitioning
       val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
         val part = iter.next()._2
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index e135d1d7ad6a35e8a5eac4efa6bc4c199a9778d7..d2e0c01bc35ef87c6e2d699fd61eaa807ae8ec71 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -26,10 +26,16 @@ import org.apache.spark.graphx._
 
 class EdgePartitionSuite extends FunSuite {
 
+  def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A, Int] = {
+    val builder = new EdgePartitionBuilder[A, Int]
+    for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
+    builder.toEdgePartition
+  }
+
   test("reverse") {
     val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
     val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- edges) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -40,7 +46,7 @@ class EdgePartitionSuite extends FunSuite {
 
   test("map") {
     val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- edges) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -49,11 +55,22 @@ class EdgePartitionSuite extends FunSuite {
       edges.map(e => e.copy(attr = e.srcId + e.dstId)))
   }
 
+  test("filter") {
+    val edges = List(Edge(0, 1, 0), Edge(0, 2, 0), Edge(2, 0, 0))
+    val builder = new EdgePartitionBuilder[Int, Int]
+    for (e <- edges) {
+      builder.add(e.srcId, e.dstId, e.attr)
+    }
+    val edgePartition = builder.toEdgePartition
+    val filtered = edgePartition.filter(et => et.srcId == 0, (vid, attr) => vid == 0 || vid == 1)
+    assert(filtered.tripletIterator().toList.map(et => (et.srcId, et.dstId)) === List((0L, 1L)))
+  }
+
   test("groupEdges") {
     val edges = List(
       Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
     val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- edges) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -61,11 +78,19 @@ class EdgePartitionSuite extends FunSuite {
     assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
   }
 
+  test("upgradeIterator") {
+    val edges = List((0, 1, 0), (1, 0, 0))
+    val verts = List((0L, 1), (1L, 2))
+    val part = makeEdgePartition(edges).updateVertices(verts.iterator)
+    assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList ===
+      part.tripletIterator().toList.map(_.toTuple))
+  }
+
   test("indexIterator") {
     val edgesFrom0 = List(Edge(0, 1, 0))
     val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
     val sortedEdges = edgesFrom0 ++ edgesFrom1
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Nothing]
     for (e <- Random.shuffle(sortedEdges)) {
       builder.add(e.srcId, e.dstId, e.attr)
     }
@@ -77,11 +102,6 @@ class EdgePartitionSuite extends FunSuite {
   }
 
   test("innerJoin") {
-    def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
-      val builder = new EdgePartitionBuilder[A]
-      for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
-      builder.toEdgePartition
-    }
     val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
     val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
     val a = makeEdgePartition(aList)
@@ -90,4 +110,14 @@ class EdgePartitionSuite extends FunSuite {
     assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
       List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
   }
+
+  test("isActive, numActives, replaceActives") {
+    val ep = new EdgePartitionBuilder[Nothing, Nothing].toEdgePartition
+      .withActiveSet(Iterator(0L, 2L, 0L))
+    assert(ep.isActive(0))
+    assert(!ep.isActive(1))
+    assert(ep.isActive(2))
+    assert(!ep.isActive(-1))
+    assert(ep.numActives == Some(2))
+  }
 }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
index 9cbb2d2acdc2daf4b1372adae513d3407265825e..49b2704390fea17e8562413bc21f2b03f476072d 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
@@ -26,17 +26,11 @@ import org.apache.spark.graphx._
 
 class EdgeTripletIteratorSuite extends FunSuite {
   test("iterator.toList") {
-    val builder = new EdgePartitionBuilder[Int]
+    val builder = new EdgePartitionBuilder[Int, Int]
     builder.add(1, 2, 0)
     builder.add(1, 3, 0)
     builder.add(1, 4, 0)
-    val vidmap = new VertexIdToIndexMap
-    vidmap.add(1)
-    vidmap.add(2)
-    vidmap.add(3)
-    vidmap.add(4)
-    val vs = Array.fill(vidmap.capacity)(0)
-    val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
+    val iter = new EdgeTripletIterator[Int, Int](builder.toEdgePartition, true, true)
     val result = iter.toList.map(et => (et.srcId, et.dstId))
     assert(result === Seq((1, 2), (1, 3), (1, 4)))
   }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index a048d13fd12b8471268c38c23e8f124263aac815..8bf1384d514c13bff7f9cf1733f24317f320846c 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -30,17 +30,6 @@ class VertexPartitionSuite extends FunSuite {
     assert(!vp.isDefined(-1))
   }
 
-  test("isActive, numActives, replaceActives") {
-    val vp = VertexPartition(Iterator((0L, 1), (1L, 1)))
-      .filter { (vid, attr) => vid == 0 }
-      .replaceActives(Iterator(0, 2, 0))
-    assert(vp.isActive(0))
-    assert(!vp.isActive(1))
-    assert(vp.isActive(2))
-    assert(!vp.isActive(-1))
-    assert(vp.numActives == Some(2))
-  }
-
   test("map") {
     val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 }
     assert(vp(0) === 2)
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index efdb38e907d1435900b88e435f9fd1447f862054..fafc9b36a77d3f370a94a78cf6ae1a92f7af757b 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -76,6 +76,8 @@ object MimaBuild {
           excludeSparkClass("util.XORShiftRandom") ++
           excludeSparkClass("graphx.EdgeRDD") ++
           excludeSparkClass("graphx.VertexRDD") ++
+          excludeSparkClass("graphx.impl.GraphImpl") ++
+          excludeSparkClass("graphx.impl.RoutingTable") ++
           excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
           excludeSparkClass("mllib.optimization.SquaredGradient") ++
           excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++