From d35c5a5176601b4c8e12d022cc72e42d6986c0a4 Mon Sep 17 00:00:00 2001 From: Nick Pentreath <nick.pentreath@gmail.com> Date: Sat, 9 Mar 2013 12:52:16 +0200 Subject: [PATCH] Adding test for non-default persistence level --- bagel/src/test/scala/bagel/BagelSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 47829a431e..25db395c22 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._ import scala.collection.mutable.ArrayBuffer import spark._ +import storage.StorageLevel class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable class TestMessage(val targetId: String) extends Message[String] with Serializable @@ -79,4 +80,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } } } + + test("using non-default persistence level") { + failAfter(10 seconds) { + sc = new SparkContext("local", "test") + val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0)))) + val msgs = sc.parallelize(Array[(String, TestMessage)]()) + val numSupersteps = 50 + val result = + Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) { + (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => + (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]()) + } + for ((id, vert) <- result.collect) { + assert(vert.age === numSupersteps) + } + } + } } -- GitLab