Skip to content
Snippets Groups Projects
Commit b1feb602 authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

[SPARK-1991] Support custom storage levels for vertices and edges

This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods.

I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #946 from ankurdave/SPARK-1991 and squashes the following commits:

ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString
ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores
c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks"
34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks
6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
parent 894ecde0
No related branches found
No related tags found
No related merge requests found
...@@ -147,6 +147,27 @@ object StorageLevel { ...@@ -147,6 +147,27 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false) val OFF_HEAP = new StorageLevel(false, false, true, false)
/**
* :: DeveloperApi ::
* Return the StorageLevel object with the specified name.
*/
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
}
/** /**
* :: DeveloperApi :: * :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap. * Create a new StorageLevel object without setting useOffHeap.
......
...@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD ...@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.EdgePartition import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder
/** /**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
...@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition ...@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`. * `impl.ReplicatedVertexView`.
*/ */
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]) val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
partitionsRDD.setName("EdgeRDD") partitionsRDD.setName("EdgeRDD")
...@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( ...@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
/**
* Persists the edge partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = { override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel) partitionsRDD.persist(newLevel)
this this
...@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( ...@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this this
} }
/** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) { if (iter.hasNext) {
val (pid, ep) = iter.next() val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep))) Iterator(Tuple2(pid, f(pid, ep)))
...@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( ...@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2] val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3] val ed3Tag = classTag[ED3]
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) => (thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next() val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next() val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
}) })
} }
/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
new EdgeRDD(partitionsRDD, this.targetStorageLevel)
}
/**
* Changes the target storage level while preserving all other properties of the
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
new EdgeRDD(this.partitionsRDD, targetStorageLevel)
}
}
object EdgeRDD {
/**
* Creates an EdgeRDD from a set of edges.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
EdgeRDD.fromEdgePartitions(edgePartitions)
}
/**
* Creates an EdgeRDD from already-constructed edge partitions.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
new EdgeRDD(edgePartitions)
}
} }
...@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab ...@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
@transient val triplets: RDD[EdgeTriplet[VD, ED]] @transient val triplets: RDD[EdgeTriplet[VD, ED]]
/** /**
* Caches the vertices and edges associated with this graph at the specified storage level. * Caches the vertices and edges associated with this graph at the specified storage level,
* ignoring any target storage levels previously set.
* *
* @param newLevel the level at which to cache the graph. * @param newLevel the level at which to cache the graph.
* *
...@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab ...@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
/** /**
* Caches the vertices and edges associated with this graph. This is used to * Caches the vertices and edges associated with this graph at the previously-specified target
* pin a graph in memory enabling multiple queries to reuse the same * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
* construction process. * multiple queries to reuse the same construction process.
*/ */
def cache(): Graph[VD, ED] def cache(): Graph[VD, ED]
...@@ -358,9 +359,12 @@ object Graph { ...@@ -358,9 +359,12 @@ object Graph {
* Construct a graph from a collection of edges encoded as vertex id pairs. * Construct a graph from a collection of edges encoded as vertex id pairs.
* *
* @param rawEdges a collection of edges in (src, dst) form * @param rawEdges a collection of edges in (src, dst) form
* @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge * @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided. * `uniqueEdges`, a [[PartitionStrategy]] must be provided.
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
* *
* @return a graph with edge attributes containing either the count of duplicate edges or 1 * @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex. * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
...@@ -368,10 +372,12 @@ object Graph { ...@@ -368,10 +372,12 @@ object Graph {
def fromEdgeTuples[VD: ClassTag]( def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)], rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD, defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{ {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue) val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match { uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b) case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph case None => graph
...@@ -383,14 +389,18 @@ object Graph { ...@@ -383,14 +389,18 @@ object Graph {
* *
* @param edges the RDD containing the set of edges in the graph * @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex * @param defaultValue the default vertex attribute to use for each vertex
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
* *
* @return a graph with edge attributes described by `edges` and vertices * @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue` * given by all vertices in `edges` with value `defaultValue`
*/ */
def fromEdges[VD: ClassTag, ED: ClassTag]( def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = { defaultValue: VD,
GraphImpl(edges, defaultValue) edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
} }
/** /**
...@@ -405,12 +415,16 @@ object Graph { ...@@ -405,12 +415,16 @@ object Graph {
* @param edges the collection of edges in the graph * @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are * @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices * mentioned in edges but not in vertices
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/ */
def apply[VD: ClassTag, ED: ClassTag]( def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)], vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { defaultVertexAttr: VD = null.asInstanceOf[VD],
GraphImpl(vertices, edges, defaultVertexAttr) edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
} }
/** /**
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.graphx package org.apache.spark.graphx
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext} import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
...@@ -48,12 +49,16 @@ object GraphLoader extends Logging { ...@@ -48,12 +49,16 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive * @param canonicalOrientation whether to orient edges in the positive
* direction * direction
* @param minEdgePartitions the number of partitions for the edge RDD * @param minEdgePartitions the number of partitions for the edge RDD
* @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
* storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
*/ */
def edgeListFile( def edgeListFile(
sc: SparkContext, sc: SparkContext,
path: String, path: String,
canonicalOrientation: Boolean = false, canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1) minEdgePartitions: Int = 1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] = : Graph[Int, Int] =
{ {
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
...@@ -78,12 +83,13 @@ object GraphLoader extends Logging { ...@@ -78,12 +83,13 @@ object GraphLoader extends Logging {
} }
} }
Iterator((pid, builder.toEdgePartition)) Iterator((pid, builder.toEdgePartition))
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path)) }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count() edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile } // end of edgeListFile
} }
...@@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._ ...@@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
* @tparam VD the vertex attribute associated with each vertex in the set. * @tparam VD the vertex attribute associated with each vertex in the set.
*/ */
class VertexRDD[@specialized VD: ClassTag]( class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[ShippableVertexPartition[VD]]) val partitionsRDD: RDD[ShippableVertexPartition[VD]],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined) require(partitionsRDD.partitioner.isDefined)
...@@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD will be based on a different index and can no longer be quickly joined with this * VertexRDD will be based on a different index and can no longer be quickly joined with this
* RDD. * RDD.
*/ */
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
override val partitioner = partitionsRDD.partitioner override val partitioner = partitionsRDD.partitioner
...@@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
} }
setName("VertexRDD") setName("VertexRDD")
/**
* Persists the vertex partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = { override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel) partitionsRDD.persist(newLevel)
this this
...@@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
this this
} }
/** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}
/** The number of vertices in the RDD. */ /** The number of vertices in the RDD. */
override def count(): Long = { override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _) partitionsRDD.map(_.size).reduce(_ + _)
...@@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = { : VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD) this.withPartitionsRDD(newPartitionsRDD)
} }
...@@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next() val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart)) Iterator(thisPart.diff(otherPart))
} }
new VertexRDD(newPartitionsRDD) this.withPartitionsRDD(newPartitionsRDD)
} }
/** /**
...@@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next() val otherPart = otherIter.next()
Iterator(thisPart.leftJoin(otherPart)(f)) Iterator(thisPart.leftJoin(otherPart)(f))
} }
new VertexRDD(newPartitionsRDD) this.withPartitionsRDD(newPartitionsRDD)
} }
/** /**
...@@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] => case other: VertexRDD[_] =>
leftZipJoin(other)(f) leftZipJoin(other)(f)
case _ => case _ =>
new VertexRDD[VD3]( this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions( partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f)) (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
...@@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next() val otherPart = otherIter.next()
Iterator(thisPart.innerJoin(otherPart)(f)) Iterator(thisPart.innerJoin(otherPart)(f))
} }
new VertexRDD(newPartitionsRDD) this.withPartitionsRDD(newPartitionsRDD)
} }
/** /**
...@@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] => case other: VertexRDD[_] =>
innerZipJoin(other)(f) innerZipJoin(other)(f)
case _ => case _ =>
new VertexRDD( this.withPartitionsRDD(
partitionsRDD.zipPartitions( partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f)) (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
...@@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
} }
new VertexRDD[VD2](parts) this.withPartitionsRDD[VD2](parts)
} }
/** /**
...@@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag]( ...@@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable)) partIter.map(_.withRoutingTable(routingTable))
} }
new VertexRDD(vertexPartitions) this.withPartitionsRDD(vertexPartitions)
}
/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[VD2: ClassTag](
partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
new VertexRDD(partitionsRDD, this.targetStorageLevel)
}
/**
* Changes the target storage level while preserving all other properties of the
* VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): VertexRDD[VD] = {
new VertexRDD(this.partitionsRDD, targetStorageLevel)
} }
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
......
...@@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ...@@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
this this
} }
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) override def cache(): Graph[VD, ED] = {
vertices.cache()
replicatedVertexView.edges.cache()
this
}
override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking) vertices.unpersist(blocking)
...@@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ...@@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} }
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
val numPartitions = replicatedVertexView.edges.partitions.size val numPartitions = edges.partitions.size
val edTag = classTag[ED] val edTag = classTag[ED]
val vdTag = classTag[VD] val vdTag = classTag[VD]
val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e => val newEdges = edges.withPartitionsRDD(edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class // Should we be using 3-tuple or an optimized class
...@@ -256,24 +260,33 @@ object GraphImpl { ...@@ -256,24 +260,33 @@ object GraphImpl {
/** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */ /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag]( def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] = { defaultVertexAttr: VD,
fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
} }
/** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */ /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])], edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = { defaultVertexAttr: VD,
fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, edgeStorageLevel,
vertexStorageLevel)
} }
/** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */ /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag]( def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)], vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]], edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] = { defaultVertexAttr: VD,
val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache() edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
.withTargetStorageLevel(edgeStorageLevel).cache()
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel).cache()
GraphImpl(vertexRDD, edgeRDD) GraphImpl(vertexRDD, edgeRDD)
} }
...@@ -309,23 +322,13 @@ object GraphImpl { ...@@ -309,23 +322,13 @@ object GraphImpl {
*/ */
private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
edges: EdgeRDD[ED, VD], edges: EdgeRDD[ED, VD],
defaultVertexAttr: VD): GraphImpl[VD, ED] = { defaultVertexAttr: VD,
edges.cache() edgeStorageLevel: StorageLevel,
val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr) vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
fromExistingRDDs(vertices, edges) val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
} val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel)
/** Create an EdgeRDD from a set of edges. */ fromExistingRDDs(vertices, edgesCached)
private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
new EdgeRDD(edgePartitions)
} }
} // end of object GraphImpl } // end of object GraphImpl
...@@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( ...@@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
.setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format( .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
includeSrc, includeDst, shipSrc, shipDst)) includeSrc, includeDst, shipSrc, shipDst))
.partitionBy(edges.partitioner.get) .partitionBy(edges.partitioner.get)
val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map { (ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) => case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
...@@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( ...@@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
.setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)") .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
.partitionBy(edges.partitioner.get) .partitionBy(edges.partitioner.get)
val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) { val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
(ePartIter, shippedActivesIter) => ePartIter.map { (ePartIter, shippedActivesIter) => ePartIter.map {
case (pid, edgePartition) => case (pid, edgePartition) =>
(pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator))) (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
...@@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( ...@@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
hasSrcId, hasDstId)) hasSrcId, hasDstId))
.partitionBy(edges.partitioner.get) .partitionBy(edges.partitioner.get)
val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map { (ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) => case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
package org.apache.spark.graphx.lib package org.apache.spark.graphx.lib
import scala.collection.mutable
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._ import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._ import org.apache.spark.graphx.PartitionStrategy._
...@@ -28,18 +30,20 @@ object Analytics extends Logging { ...@@ -28,18 +30,20 @@ object Analytics extends Logging {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
if (args.length < 2) { if (args.length < 2) {
System.err.println("Usage: Analytics <taskType> <file> [other options]") System.err.println(
"Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.exit(1) System.exit(1)
} }
val taskType = args(0) val taskType = args(0)
val fname = args(1) val fname = args(1)
val options = args.drop(2).map { arg => val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match { arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v) case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg) case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
} }
} }
val options = mutable.Map(optionsList: _*)
def pickPartitioner(v: String): PartitionStrategy = { def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here. // TODO: Use reflection rather than listing all the partitioning strategies here.
...@@ -57,20 +61,24 @@ object Analytics extends Logging { ...@@ -57,20 +61,24 @@ object Analytics extends Logging {
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
.set("spark.locality.wait", "100000") .set("spark.locality.wait", "100000")
val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
println("Set the number of edge partitions using --numEPart.")
sys.exit(1)
}
val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
.map(pickPartitioner(_))
val edgeStorageLevel = options.remove("edgeStorageLevel")
.map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
val vertexStorageLevel = options.remove("vertexStorageLevel")
.map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
taskType match { taskType match {
case "pagerank" => case "pagerank" =>
var tol: Float = 0.001F val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
var outFname = "" val outFname = options.remove("output").getOrElse("")
var numEPart = 4 val numIterOpt = options.remove("numIter").map(_.toInt)
var partitionStrategy: Option[PartitionStrategy] = None
var numIterOpt: Option[Int] = None options.foreach {
options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case ("numIter", v) => numIterOpt = Some(v.toInt)
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
} }
...@@ -81,7 +89,9 @@ object Analytics extends Logging { ...@@ -81,7 +89,9 @@ object Analytics extends Logging {
val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart).cache() minEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count) println("GRAPHX: Number of vertices " + graph.vertices.count)
...@@ -102,32 +112,19 @@ object Analytics extends Logging { ...@@ -102,32 +112,19 @@ object Analytics extends Logging {
sc.stop() sc.stop()
case "cc" => case "cc" =>
var numIter = Int.MaxValue options.foreach {
var numVPart = 4
var numEPart = 4
var isDynamic = false
var partitionStrategy: Option[PartitionStrategy] = None
options.foreach{
case ("numIter", v) => numIter = v.toInt
case ("dynamic", v) => isDynamic = v.toBoolean
case ("numEPart", v) => numEPart = v.toInt
case ("numVPart", v) => numVPart = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
} }
if (!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================") println("======================================")
println("| Connected Components |") println("| Connected Components |")
println("======================================") println("======================================")
val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart).cache() minEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
val cc = ConnectedComponents.run(graph) val cc = ConnectedComponents.run(graph)
...@@ -135,24 +132,25 @@ object Analytics extends Logging { ...@@ -135,24 +132,25 @@ object Analytics extends Logging {
sc.stop() sc.stop()
case "triangles" => case "triangles" =>
var numEPart = 4 options.foreach {
// TriangleCount requires the graph to be partitioned
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
} }
println("======================================") println("======================================")
println("| Triangle Count |") println("| Triangle Count |")
println("======================================") println("======================================")
val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, val graph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() canonicalOrientation = true,
minEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
// TriangleCount requires the graph to be partitioned
.partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
val triangles = TriangleCount.run(graph) val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map { println("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong case (vid, data) => data.toLong
}.reduce(_ + _) / 3) }.reduce(_ + _) / 3)
sc.stop() sc.stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment