From 1d6abe3a4b58f28fc4e0e690e02c19b2568ce1ee Mon Sep 17 00:00:00 2001 From: Ankur Dave <ankurdave@gmail.com> Date: Wed, 23 Apr 2014 22:01:13 -0700 Subject: [PATCH] Mark all fields of EdgePartition, Graph, and GraphOps transient These classes are only serializable to work around closure capture, so their fields should all be marked `@transient` to avoid wasteful serialization. This PR supersedes apache/spark#519 and fixes the same bug. Author: Ankur Dave <ankurdave@gmail.com> Closes #520 from ankurdave/graphx-transient and squashes the following commits: 6431760 [Ankur Dave] Mark all fields of EdgePartition, Graph, and GraphOps `@transient` --- .../src/main/scala/org/apache/spark/graphx/Graph.scala | 6 +++--- .../main/scala/org/apache/spark/graphx/GraphOps.scala | 10 +++++----- .../org/apache/spark/graphx/impl/EdgePartition.scala | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 45349692cb..5039586890 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * @note vertex ids are unique. * @return an RDD containing the vertices in this graph */ - val vertices: VertexRDD[VD] + @transient val vertices: VertexRDD[VD] /** * An RDD containing the edges and their associated attributes. The entries in the RDD contain @@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * along with their vertex data. * */ - val edges: EdgeRDD[ED] + @transient val edges: EdgeRDD[ED] /** * An RDD containing the edge triplets, which are edges along with the vertex data associated with @@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum * }}} */ - val triplets: RDD[EdgeTriplet[VD, ED]] + @transient val triplets: RDD[EdgeTriplet[VD, ED]] /** * Caches the vertices and edges associated with this graph at the specified storage level. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 5635287694..4997fbc3cb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -34,28 +34,28 @@ import scala.util.Random class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable { /** The number of edges in the graph. */ - lazy val numEdges: Long = graph.edges.count() + @transient lazy val numEdges: Long = graph.edges.count() /** The number of vertices in the graph. */ - lazy val numVertices: Long = graph.vertices.count() + @transient lazy val numVertices: Long = graph.vertices.count() /** * The in-degree of each vertex in the graph. * @note Vertices with no in-edges are not returned in the resulting RDD. */ - lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In) + @transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In) /** * The out-degree of each vertex in the graph. * @note Vertices with no out-edges are not returned in the resulting RDD. */ - lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out) + @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out) /** * The degree of each vertex in the graph. * @note Vertices with no edges are not returned in the resulting RDD. */ - lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either) + @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either) /** * Computes the neighboring vertex degrees. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 2e05f5d4e4..b7c472e905 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap */ private[graphx] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( - val srcIds: Array[VertexId], - val dstIds: Array[VertexId], - val data: Array[ED], - val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable { + @transient val srcIds: Array[VertexId], + @transient val dstIds: Array[VertexId], + @transient val data: Array[ED], + @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable { /** * Reverse all the edges in this partition. -- GitLab