-
Ankur Dave authoredAnkur Dave authored
layout: global
title: GraphX Programming Guide
- This will become a table of contents (this text will be scraped). {:toc}
Overview
GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high-level, GraphX extends the Spark RDD by introducing the Resilient Distributed property Graph (RDG): a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and mapReduceTriplets) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.
Background on Graph-Parallel Computation
From social networks to language modeling, the growing scale and importance of graph data has driven the development of numerous new graph-parallel systems (e.g., Giraph and GraphLab). By restricting the types of computation that can be expressed and introducing new techniques to partition and distribute graphs, these systems can efficiently execute sophisticated graph algorithms orders of magnitude faster than more general data-parallel systems.
However, the same restrictions that enable these substantial performance gains also make it difficult to express many of the important stages in a typical graph-analytics pipeline: constructing the graph, modifying its structure, or expressing computation that spans multiple graphs. As a consequence, existing graph analytics pipelines compose graph-parallel and data-parallel systems, leading to extensive data movement and duplication and a complicated programming model.
The goal of the GraphX project is to unify graph-parallel and data-parallel computation in one system with a single composable API. The GraphX API enables users to view data both as a graph and as collections (i.e., RDDs) without data movement or duplication. By incorporating recent advances in graph-parallel systems, GraphX is able to optimize the execution of graph operations.
GraphX Replaces the Spark Bagel API
Prior to the release of GraphX, graph computation in Spark was expressed using Bagel, an implementation of Pregel. GraphX improves upon Bagel by exposing a richer property graph API, a more streamlined version of the Pregel abstraction, and system optimizations to improve performance and reduce memory overhead. While we plan to eventually deprecate Bagel, we will continue to support the Bagel API and Bagel programming guide. However, we encourage Bagel users to explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
Getting Started
To get started you first need to import Spark and GraphX into your project, as follows:
{% highlight scala %} import org.apache.spark._ import org.apache.spark.graphx._ // To make some of the examples work we will also need RDD import org.apache.spark.rdd.RDD {% endhighlight %}
If you are not using the Spark shell you will also need a SparkContext
. To learn more about
getting started with Spark refer to the Spark Quick Start Guide.
The Property Graph
The property graph is a directed multigraph
with user defined objects attached to each vertex and edge. A directed multigraph is a directed
graph with potentially multiple parallel edges sharing the same source and destination vertex. The
ability to support parallel edges simplifies modeling scenarios where there can be multiple
relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a
unique 64-bit long identifier (VertexId
). Similarly, edges have corresponding source and
destination vertex identifiers. GraphX does not impose any ordering or constraints on the vertex
identifiers. The property graph is parameterized over the vertex VD
and edge ED
types. These
are the types of the objects associated with each vertex and edge respectively.
GraphX optimizes the representation of
VD
andED
when they are plain old data-types (e.g., int, double, etc...) reducing the in memory footprint.
In some cases we may wish to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:
{% highlight scala %} class VertexProperty() case class UserProperty(val name: String) extends VertexProperty case class ProductProperty(val name: String, val price: Double) extends VertexProperty // The graph might then have the type: var graph: Graph[VertexProperty, String] = null {% endhighlight %}
Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. The graph is partitioned across the workers using a range of vertex-partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:
{% highlight scala %} val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] {% endhighlight %}
The classes VertexRDD[VD]
and EdgeRDD[ED]
extend and are optimized versions of RDD[(VertexId, VD)]
and RDD[Edge[ED]]
respectively. Both VertexRDD[VD]
and EdgeRDD[ED]
provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
VertexRDD
and EdgeRDD
API in greater detail in the section on vertex and edge
RDDs but for now they can be thought of as simply RDDs of the form:
RDD[(VertexId, VD)]
and RDD[Edge[ED]]
.
Example Property Graph
Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:
The resulting graph would have the type signature:
{% highlight scala %} val userGraph: Graph[(String, String), String] {% endhighlight %}
There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:
{% highlight scala %} // Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices val users: RDD[(VertexID, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) {% endhighlight %}
In the above example we make use of the Edge
case class. Edges have a srcId
and a
dstId
corresponding to the source and destination vertex identifiers. In addition, the Edge
class contains the attr
member which contains the edge property.
We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices
and graph.edges
members respectively.
{% highlight scala %} val graph: Graph[(String, String), String] // Constructed from above // Count all users which are postdocs graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all the edges where src > dst graph.edges.filter(e => e.srcId > e.dstId).count {% endhighlight %}
Note that
graph.vertices
returns anVertexRDD[(String, String)]
which extendsRDD[(VertexId, (String, String))]
and so we use the scalacase
expression to deconstruct the tuple. On the other hand,graph.edges
returns anEdgeRDD
containingEdge[String]
objects. We could have also used the case class type constructor as in the following: {% highlight scala %} graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count {% endhighlight %}
In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view.
The triplet view logically joins the vertex and edge properties yielding an
RDD[EdgeTriplet[VD, ED]]
containing instances of the EdgeTriplet
class. This
join can be expressed in the following SQL expression:
{% highlight sql %} SELECT src.id, dst.id, src.attr, e.attr, dst.attr FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst ON e.srcId = src.Id AND e.dstId = dst.Id {% endhighlight %}
or graphically as:
The EdgeTriplet
class extends the Edge
class by adding the srcAttr
and
dstAttr
members which contain the source and destination properties respectively. We can use the
triplet view of a graph to render a collection of strings describing relationships between users.
{% highlight scala %} val graph: Graph[(String, String), String] // Constructed from above // Use the triplets view to create an RDD of facts. val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr.1) facts.collect.foreach(println()) {% endhighlight %}
Graph Operators
Just as RDDs have basic operations like map
, filter
, and reduceByKey
, property graphs also
have a collection of basic operators that take user defined functions and produce new graphs with
transformed properties and structure. The core operators that have optimized implementations are
defined in Graph
and convenient operators that are expressed as a compositions of the
core operators are defined in GraphOps
. However, thanks to Scala implicits the
operators in GraphOps
are automatically available as members of Graph
. For example, we can
compute the in-degree of each vertex (defined in GraphOps
) by the following:
{% highlight scala %} val graph: Graph[(String, String), String] // Use the implicit GraphOps.inDegrees operator val indDegrees: VertexRDD[Int] = graph.inDegrees {% endhighlight %}
The reason for differentiating between core graph operations and GraphOps
is to be
able to support different graph representations in the future. Each graph representation must
provide implementations of the core operations and reuse many of the useful operations defined in
GraphOps
.
Property Operators
In direct analogy to the RDD map
operator, the property
graph contains the following:
{% highlight scala %} def mapVertices[VD2](map:%20(VertexID,%20VD) => VD2): Graph[VD2, ED] def mapEdgesED2: Graph[VD, ED2] def mapTripletsED2: Graph[VD, ED2] {% endhighlight %}
Each of these operators yields a new graph with the vertex or edge properties modified by the user
defined map
function.
Note that in all cases the graph structure is unaffected. This is a key feature of these operators which allows the resulting graph to reuse the structural indices of the original graph. The following snippets are logically equivalent, but the first one does not preserve the structural indices and would not benefit from the GraphX system optimizations: {% highlight scala %} val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) } val newGraph = Graph(newVertices, graph.edges) {% endhighlight %} Instead, use
mapVertices
to preserve the indices: {% highlight scala %} val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr)) {% endhighlight %}
These operators are often used to initialize the graph for a particular computation or project away unnecessary properties. For example, given a graph with the out-degrees as the vertex properties (we describe how to construct such a graph later), we initialize it for PageRank:
{% highlight scala %} // Given a graph where the vertex property is the out-degree val inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) // Construct a graph where each edge contains the weight // and each vertex is the initial PageRank val outputGraph: Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0) {% endhighlight %}
Structural Operators
Currently GraphX supports only a simple set of commonly used structural operators and we expect to add more in the future. The following is a list of the basic structural operators.
{% highlight scala %} def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexID, VD) => Boolean): Graph[VD, ED] def maskVD2, ED2: Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] {% endhighlight %}
The reverse
operator returns a new graph with all the edge directions reversed.
This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse
operation does not modify vertex or edge properties or change the number of edges, it can be
implemented efficiently without data-movement or duplication.