diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 37b6e453592e5f9506cefa14d6302f65874384b8..13b2b57719188e15cfc5d1e0b0515d6b586ba26f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -162,7 +162,8 @@ object PageRank extends Logging { iteration += 1 } - rankGraph + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks + normalizeRankSum(rankGraph, personalized) } /** @@ -179,7 +180,8 @@ object PageRank extends Logging { * @param resetProb The random reset probability * @param sources The list of sources to compute personalized pagerank from * @return the graph with vertex attributes - * containing the pagerank relative to all starting nodes (as a sparse vector) and + * containing the pagerank relative to all starting nodes (as a sparse vector + * indexed by the position of nodes in the sources list) and * edge attributes the normalized edge weight */ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], @@ -194,6 +196,8 @@ object PageRank extends Logging { // TODO if one sources vertex id is outside of the int range // we won't be able to store its activations in a sparse vector + require(sources.max <= Int.MaxValue.toLong, + s"This implementation currently only works for source vertex ids at most ${Int.MaxValue}") val zero = Vectors.sparse(sources.size, List()).asBreeze val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze @@ -245,8 +249,10 @@ object PageRank extends Logging { i += 1 } + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks + val rankSums = rankGraph.vertices.values.fold(zero)(_ :+ _) rankGraph.mapVertices { (vid, attr) => - Vectors.fromBreeze(attr) + Vectors.fromBreeze(attr :/ rankSums) } } @@ -307,7 +313,7 @@ object PageRank extends Logging { .mapTriplets( e => 1.0 / e.srcAttr ) // Set the vertex attributes to (initialPR, delta = 0) .mapVertices { (id, attr) => - if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0) + if (id == src) (0.0, Double.NegativeInfinity) else (0.0, 0.0) } .cache() @@ -322,13 +328,12 @@ object PageRank extends Logging { def personalizedVertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { val (oldPR, lastDelta) = attr - var teleport = oldPR - val delta = if (src==id) resetProb else 0.0 - teleport = oldPR*delta - - val newPR = teleport + (1.0 - resetProb) * msgSum - val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR - (newPR, newDelta) + val newPR = if (lastDelta == Double.NegativeInfinity) { + 1.0 + } else { + oldPR + (1.0 - resetProb) * msgSum + } + (newPR, newPR - oldPR) } def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { @@ -353,9 +358,23 @@ object PageRank extends Logging { vertexProgram(id, attr, msgSum) } - Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( + val rankGraph = Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( vp, sendMessage, messageCombiner) .mapVertices((vid, attr) => attr._1) - } // end of deltaPageRank + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks + normalizeRankSum(rankGraph, personalized) + } + + // Normalizes the sum of ranks to n (or 1 if personalized) + private def normalizeRankSum(rankGraph: Graph[Double, Double], personalized: Boolean) = { + val rankSum = rankGraph.vertices.values.sum() + if (personalized) { + rankGraph.mapVertices((id, rank) => rank / rankSum) + } else { + val numVertices = rankGraph.numVertices + val correctionFactor = numVertices.toDouble / rankSum + rankGraph.mapVertices((id, rank) => rank * correctionFactor) + } + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 6afbb5a959894c3d6be977df7cfd1216f9d45193..9779553ce85d1c4acd1a77e980f1dbbebedf1b6d 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -50,7 +50,8 @@ object GridPageRank { inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum } } - (0L until (nRows * nCols)).zip(pr) + val prSum = pr.sum + (0L until (nRows * nCols)).zip(pr.map(_ * pr.length / prSum)) } } @@ -68,26 +69,34 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val nVertices = 100 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() val resetProb = 0.15 + val tol = 0.0001 + val numIter = 2 val errorTol = 1.0e-5 - val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices - val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache() + val staticRanks = starGraph.staticPageRank(numIter, resetProb).vertices.cache() + val staticRanks2 = starGraph.staticPageRank(numIter + 1, resetProb).vertices - // Static PageRank should only take 3 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => + // Static PageRank should only take 2 iterations to converge + val notMatching = staticRanks.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => if (pr1 != pr2) 1 else 0 }.map { case (vid, test) => test }.sum() assert(notMatching === 0) - val staticErrors = staticRanks2.map { case (vid, pr) => - val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) - val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5) - if (!correct) 1 else 0 - } - assert(staticErrors.sum === 0) + val dynamicRanks = starGraph.pageRank(tol, resetProb).vertices.cache() + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + + // Computed in igraph 1.0 w/ R bindings: + // > page_rank(make_star(100, mode = "in")) + // Alternatively in NetworkX 1.11: + // > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)])) + // We multiply by the number of vertices to account for difference in normalization + val centerRank = 0.462394787 * nVertices + val othersRank = 0.005430356 * nVertices + val igraphPR = centerRank +: Seq.fill(nVertices - 1)(othersRank) + val ranks = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR)) + assert(compareRanks(staticRanks, ranks) < errorTol) + assert(compareRanks(dynamicRanks, ranks) < errorTol) - val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) } } // end of test Star PageRank @@ -96,51 +105,62 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val nVertices = 100 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() val resetProb = 0.15 + val tol = 0.0001 + val numIter = 2 val errorTol = 1.0e-5 - val staticRanks1 = starGraph.staticPersonalizedPageRank(0, numIter = 1, resetProb).vertices - val staticRanks2 = starGraph.staticPersonalizedPageRank(0, numIter = 2, resetProb) - .vertices.cache() - - // Static PageRank should only take 2 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => - if (pr1 != pr2) 1 else 0 - }.map { case (vid, test) => test }.sum - assert(notMatching === 0) + val staticRanks = starGraph.staticPersonalizedPageRank(0, numIter, resetProb).vertices.cache() - val staticErrors = staticRanks2.map { case (vid, pr) => - val correct = (vid > 0 && pr == 0.0) || - (vid == 0 && pr == resetProb) - if (!correct) 1 else 0 - } - assert(staticErrors.sum === 0) - - val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + val dynamicRanks = starGraph.personalizedPageRank(0, tol, resetProb).vertices.cache() + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - val parallelStaticRanks1 = starGraph - .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices { + val parallelStaticRanks = starGraph + .staticParallelPersonalizedPageRank(Array(0), numIter, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() - assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol) + assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) + + // Computed in igraph 1.0 w/ R bindings: + // > page_rank(make_star(100, mode = "in"), personalized = c(1, rep(0, 99)), algo = "arpack") + // NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all + // vertices uniformly instead of just to the personalization source. + // Alternatively in NetworkX 1.11: + // > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]), + // personalization=dict([(x, 1 if x == 0 else 0) for x in range(0,100)])) + // We multiply by the number of vertices to account for difference in normalization + val igraphPR0 = 1.0 +: Seq.fill(nVertices - 1)(0.0) + val ranks0 = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR0)) + assert(compareRanks(staticRanks, ranks0) < errorTol) + assert(compareRanks(dynamicRanks, ranks0) < errorTol) - val parallelStaticRanks2 = starGraph - .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { - case (vertexId, vector) => vector(0) - }.vertices.cache() - assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol) // We have one outbound edge from 1 to 0 - val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb) + val otherStaticRanks = starGraph.staticPersonalizedPageRank(1, numIter, resetProb) .vertices.cache() - val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache() - val otherParallelStaticRanks2 = starGraph - .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices { + val otherDynamicRanks = starGraph.personalizedPageRank(1, tol, resetProb).vertices.cache() + val otherParallelStaticRanks = starGraph + .staticParallelPersonalizedPageRank(Array(0, 1), numIter, resetProb).mapVertices { case (vertexId, vector) => vector(1) }.vertices.cache() - assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol) - assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol) - assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol) + assert(compareRanks(otherDynamicRanks, otherStaticRanks) < errorTol) + assert(compareRanks(otherStaticRanks, otherParallelStaticRanks) < errorTol) + assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks) < errorTol) + + // Computed in igraph 1.0 w/ R bindings: + // > page_rank(make_star(100, mode = "in"), + // personalized = c(0, 1, rep(0, 98)), algo = "arpack") + // NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all + // vertices uniformly instead of just to the personalization source. + // Alternatively in NetworkX 1.11: + // > nx.pagerank(nx.DiGraph([(x, 0) for x in range(1,100)]), + // personalization=dict([(x, 1 if x == 1 else 0) for x in range(0,100)])) + val centerRank = 0.4594595 + val sourceRank = 0.5405405 + val igraphPR1 = centerRank +: sourceRank +: Seq.fill(nVertices - 2)(0.0) + val ranks1 = VertexRDD(sc.parallelize(0L until nVertices zip igraphPR1)) + assert(compareRanks(otherStaticRanks, ranks1) < errorTol) + assert(compareRanks(otherDynamicRanks, ranks1) < errorTol) + assert(compareRanks(otherParallelStaticRanks, ranks1) < errorTol) } } // end of test Star PersonalPageRank @@ -229,4 +249,50 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { } } + + test("Loop with sink PageRank") { + withSpark { sc => + val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 1L) :: (1L, 4L) :: Nil) + val g = Graph.fromEdgeTuples(edges, 1) + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 20 + val errorTol = 1.0e-5 + + val staticRanks = g.staticPageRank(numIter, resetProb).vertices.cache() + val dynamicRanks = g.pageRank(tol, resetProb).vertices.cache() + + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + + // Computed in igraph 1.0 w/ R bindings: + // > page_rank(graph_from_literal( A -+ B -+ C -+ A -+ D)) + // Alternatively in NetworkX 1.11: + // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,1),(1,4)])) + // We multiply by the number of vertices to account for difference in normalization + val igraphPR = Seq(0.3078534, 0.2137622, 0.2646223, 0.2137622).map(_ * 4) + val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR)) + assert(compareRanks(staticRanks, ranks) < errorTol) + assert(compareRanks(dynamicRanks, ranks) < errorTol) + + val p1staticRanks = g.staticPersonalizedPageRank(1, numIter, resetProb).vertices.cache() + val p1dynamicRanks = g.personalizedPageRank(1, tol, resetProb).vertices.cache() + val p1parallelDynamicRanks = + g.staticParallelPersonalizedPageRank(Array(1, 2, 3, 4), numIter, resetProb) + .vertices.mapValues(v => v(0)).cache() + + // Computed in igraph 1.0 w/ R bindings: + // > page_rank(graph_from_literal( A -+ B -+ C -+ A -+ D), personalized = c(1, 0, 0, 0), + // algo = "arpack") + // NOTE: We use the arpack algorithm as prpack (the default) redistributes rank to all + // vertices uniformly instead of just to the personalization source. + // Alternatively in NetworkX 1.11: + // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,1),(1,4)]), personalization={1:1, 2:0, 3:0, 4:0}) + val igraphPR2 = Seq(0.4522329, 0.1921990, 0.1633691, 0.1921990) + val ranks2 = VertexRDD(sc.parallelize(1L to 4L zip igraphPR2)) + assert(compareRanks(p1staticRanks, ranks2) < errorTol) + assert(compareRanks(p1dynamicRanks, ranks2) < errorTol) + assert(compareRanks(p1parallelDynamicRanks, ranks2) < errorTol) + + } + } }