diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index 094e57dacb7cc313735c3f80556248c8d58f2e20..e10c03f6bad1c8640143afc8808c90afb16ca7d6 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -4,8 +4,38 @@ import spark._ import spark.SparkContext._ import scala.collection.mutable.ArrayBuffer +import storage.StorageLevel object Bagel extends Logging { + + val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY + + /** + * Runs a Bagel program. + * @param sc [[spark.SparkContext]] to use for the program. + * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be + * the vertex id. + * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an + * empty array, i.e. sc.parallelize(Array[K, Message]()). + * @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one + * message before sending (which often involves network I/O). + * @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep, + * and provides the result to each vertex in the next superstep. + * @param partitioner [[spark.Partitioner]] partitions values by key + * @param numPartitions number of partitions across which to split the graph. + * Default is the default parallelism of the SparkContext + * @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep. + * Defaults to caching in memory. + * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex, + * optional Aggregator and the current superstep, + * and returns a set of (Vertex, outgoing Messages) pairs + * @tparam K key + * @tparam V vertex type + * @tparam M message type + * @tparam C combiner + * @tparam A aggregator + * @return an RDD of (K, V) pairs representing the graph after completion of the program + */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest]( sc: SparkContext, @@ -14,7 +44,8 @@ object Bagel extends Logging { combiner: Combiner[M, C], aggregator: Option[Aggregator[V, A]], partitioner: Partitioner, - numPartitions: Int + numPartitions: Int, + storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL )( compute: (V, Option[C], Option[A], Int) => (V, Array[M]) ): RDD[(K, V)] = { @@ -33,7 +64,7 @@ object Bagel extends Logging { combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) val (processed, numMsgs, numActiveVerts) = - comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep)) + comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel) val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000)) @@ -50,6 +81,7 @@ object Bagel extends Logging { verts } + /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -59,12 +91,29 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) + ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + + /** Runs a Bagel program with no [[spark.bagel.Aggregator]] */ + def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( + sc: SparkContext, + vertices: RDD[(K, V)], + messages: RDD[(K, M)], + combiner: Combiner[M, C], + partitioner: Partitioner, + numPartitions: Int, + storageLevel: StorageLevel + )( + compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = { run[K, V, M, C, Nothing]( - sc, vertices, messages, combiner, None, partitioner, numPartitions)( + sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)( addAggregatorArg[K, V, M, C](compute)) } + /** + * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]] + * and default storage level + */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -73,13 +122,29 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) + ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + + /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/ + def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( + sc: SparkContext, + vertices: RDD[(K, V)], + messages: RDD[(K, M)], + combiner: Combiner[M, C], + numPartitions: Int, + storageLevel: StorageLevel + )( + compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = { val part = new HashPartitioner(numPartitions) run[K, V, M, C, Nothing]( - sc, vertices, messages, combiner, None, part, numPartitions)( + sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)( addAggregatorArg[K, V, M, C](compute)) } + /** + * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]], + * [[spark.bagel.DefaultCombiner]] and the default storage level + */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -87,10 +152,24 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[Array[M]], Int) => (V, Array[M]) - ): RDD[(K, V)] = { + ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + + /** + * Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]] + * and [[spark.bagel.DefaultCombiner]] + */ + def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( + sc: SparkContext, + vertices: RDD[(K, V)], + messages: RDD[(K, M)], + numPartitions: Int, + storageLevel: StorageLevel + )( + compute: (V, Option[Array[M]], Int) => (V, Array[M]) + ): RDD[(K, V)] = { val part = new HashPartitioner(numPartitions) run[K, V, M, Array[M], Nothing]( - sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)( + sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)( addAggregatorArg[K, V, M, Array[M]](compute)) } @@ -117,7 +196,8 @@ object Bagel extends Logging { private def comp[K: Manifest, V <: Vertex, M <: Message[K], C]( sc: SparkContext, grouped: RDD[(K, (Seq[C], Seq[V]))], - compute: (V, Option[C]) => (V, Array[M]) + compute: (V, Option[C]) => (V, Array[M]), + storageLevel: StorageLevel ): (RDD[(K, (V, Array[M]))], Int, Int) = { var numMsgs = sc.accumulator(0) var numActiveVerts = sc.accumulator(0) @@ -135,7 +215,7 @@ object Bagel extends Logging { numActiveVerts += 1 Some((newVert, newMsgs)) - }.cache + }.persist(storageLevel) // Force evaluation of processed RDD for accurate performance measurements processed.foreach(x => {}) @@ -166,6 +246,7 @@ trait Aggregator[V, A] { def mergeAggregators(a: A, b: A): A } +/** Default combiner that simply appends messages together (i.e. performs no aggregation) */ class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable { def createCombiner(msg: M): Array[M] = Array(msg) diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 47829a431e871489b622ae0ea5e050163e0b1427..25db395c22128013d259aae8722d567e3c0ff76f 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._ import scala.collection.mutable.ArrayBuffer import spark._ +import storage.StorageLevel class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable class TestMessage(val targetId: String) extends Message[String] with Serializable @@ -79,4 +80,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } } } + + test("using non-default persistence level") { + failAfter(10 seconds) { + sc = new SparkContext("local", "test") + val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0)))) + val msgs = sc.parallelize(Array[(String, TestMessage)]()) + val numSupersteps = 50 + val result = + Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) { + (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => + (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) + } + for ((id, vert) <- result.collect) { + assert(vert.age === numSupersteps) + } + } + } }