diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index e10c03f6bad1c8640143afc8808c90afb16ca7d6..5ecdd7d0045fae68116792eae13da43dbdf1bb39 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer import storage.StorageLevel object Bagel extends Logging { - - val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY + val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK /** * Runs a Bagel program. @@ -63,8 +62,9 @@ object Bagel extends Logging { val combinedMsgs = msgs.combineByKey( combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) + val superstep_ = superstep // Create a read-only copy of superstep for capture in closure val (processed, numMsgs, numActiveVerts) = - comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel) + comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel) val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))