Skip to content
Snippets Groups Projects
Commit 6f6f8c92 authored by Ankur Dave's avatar Ankur Dave
Browse files

Wrap methods in the appropriate class/object declaration

parent 67795dbb
No related branches found
No related tags found
No related merge requests found
......@@ -256,7 +256,7 @@ compute the in-degree of each vertex (defined in `GraphOps`) by the following:
{% highlight scala %}
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val indDegrees: VertexRDD[Int] = graph.inDegrees
val inDegrees: VertexRDD[Int] = graph.inDegrees
{% endhighlight %}
The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
......@@ -270,9 +270,11 @@ In direct analogy to the RDD `map` operator, the property
graph contains the following:
{% highlight scala %}
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
{% endhighlight %}
Each of these operators yields a new graph with the vertex or edge properties modified by the user
......@@ -314,11 +316,13 @@ Currently GraphX supports only a simple set of commonly used structural operator
add more in the future. The following is a list of the basic structural operators.
{% highlight scala %}
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
{% endhighlight %}
The [`reverse`][Graph.reverse] operator returns a new graph with all the edge directions reversed.
......@@ -400,10 +404,12 @@ might want to pull vertex properties from one graph into another. These tasks c
using the *join* operators. Below we list the key join operators:
{% highlight scala %}
def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
{% endhighlight %}
The [`joinVertices`][GraphOps.joinVertices] operator joins the vertices with the input RDD and
......@@ -470,10 +476,12 @@ The core (heavily optimized) aggregation primitive in GraphX is the
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
{% highlight scala %}
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduce: (A, A) => A)
: VertexRDD[A]
class Graph[VD, ED] {
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduce: (A, A) => A)
: VertexRDD[A]
}
{% endhighlight %}
The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
......@@ -564,12 +572,19 @@ val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
### Collecting Neighbors
In some cases it may be easier to express computation by collecting neighboring vertices and their
attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the
`collectNeighbors` operators.
attributes at each vertex. This can be easily accomplished using the
[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
[`collectNeighbors`][GraphOps.collectNeighbors] operators.
[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]]
[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]]
{% highlight scala %}
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
}
{% endhighlight %}
> Note that these operators can be quite costly as they duplicate information and require
......@@ -600,40 +615,44 @@ messages remaining.
> neighboring vertices and the message construction is done in parallel using a user defined
> messaging function. These constraints allow additional optimization within GraphX.
The following is type signature of the Pregel operator as well as a *sketch* of its implementation
(note calls to graph.cache have been removed):
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
of its implementation (note calls to graph.cache have been removed):
[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
{% highlight scala %}
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages: -----------------------------------------------------------------------
// Run the vertex program on all vertices that receive messages
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Merge the new vertex values back into the graph
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
// Send Messages: ------------------------------------------------------------------------------
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
// on edges in the activeDir of vertices in newVerts
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
activeMessages = messages.count()
i += 1
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages: -----------------------------------------------------------------------
// Run the vertex program on all vertices that receive messages
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Merge the new vertex values back into the graph
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
// Send Messages: ------------------------------------------------------------------------------
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
// on edges in the activeDir of vertices in newVerts
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
activeMessages = messages.count()
i += 1
}
g
}
g
}
{% endhighlight %}
......@@ -749,18 +768,20 @@ time without hash evaluations. To leverage this indexed data-structure, the `Ver
following additional functionality:
{% highlight scala %}
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
class VertexRDD[VD] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
{% endhighlight %}
Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment