Skip to content
Snippets Groups Projects
Commit 15a56459 authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

[SPARK-3427] [GraphX] Avoid active vertex tracking in static PageRank

GraphX's current implementation of static (fixed iteration count) PageRank uses the Pregel API. This unnecessarily tracks active vertices, even though in static PageRank all vertices are always active. Active vertex tracking incurs the following costs:

1. A shuffle per iteration to ship the active sets to the edge partitions.
2. A hash table creation per iteration at each partition to index the active sets for lookup.
3. A hash lookup per edge to check whether the source vertex is active.

I reimplemented static PageRank using the lower-level GraphX API instead of the Pregel API. In benchmarks on a 16-node m2.4xlarge cluster, this provided a 23% speedup (from 514 s to 397 s, mean over 3 trials) for 10 iterations of PageRank on a synthetic graph with 10M vertices and 1.27B edges.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #2308 from ankurdave/SPARK-3427 and squashes the following commits:

449996a [Ankur Dave] Avoid unnecessary active vertex tracking in static PageRank
parent eae81b0b
No related branches found
No related tags found
No related merge requests found
...@@ -79,30 +79,43 @@ object PageRank extends Logging { ...@@ -79,30 +79,43 @@ object PageRank extends Logging {
def run[VD: ClassTag, ED: ClassTag]( def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{ {
// Initialize the pagerankGraph with each edge attribute having // Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0. // weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[Double, Double] = graph var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex // Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree // Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr ) .mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values // Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 ) .mapVertices( (id, attr) => resetProb )
.cache()
// Define the three functions needed to implement PageRank in the GraphX var iteration = 0
// version of Pregel var prevRankGraph: Graph[Double, Double] = null
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = while (iteration < numIter) {
resetProb + (1.0 - resetProb) * msgSum rankGraph.cache()
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
// Execute pregel for a fixed number of iterations. // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)( // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
vertexProgram, sendMessage, messageCombiner) val rankUpdates = rankGraph.mapReduceTriplets[Double](
e => Iterator((e.dstId, e.srcAttr * e.attr)), _ + _)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
}.cache()
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
logInfo(s"PageRank finished iteration $iteration.")
prevRankGraph.vertices.unpersist(false)
prevRankGraph.edges.unpersist(false)
iteration += 1
}
rankGraph
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment