diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala index 6d31ade95bf89665ba1cfa215d7a9eed6ed43bb6..40c2be8f6d26def3bb459200edabb803f9d356d7 100644 --- a/src/examples/BroadcastTest.scala +++ b/src/examples/BroadcastTest.scala @@ -3,25 +3,26 @@ import spark.SparkContext object BroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest <host> [<slices>] [<array-len>]") + System.err.println("Usage: BroadcastTest <host> [<slices>]") System.exit(1) } val spark = new SparkContext(args(0), "Broadcast Test") val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 - var arr = new Array[Int](num) - for (i <- 0 until arr.length) - arr(i) = i + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) + arr1(i) = i - val start = System.nanoTime - val barr = spark.broadcast(arr) +// var arr2 = new Array[Int](num * 2) +// for (i <- 0 until arr2.length) +// arr2(i) = i + + val barr1 = spark.broadcast(arr1) +// val barr2 = spark.broadcast(arr2) spark.parallelize(1 to 10, slices).foreach { - println("in task: barr = " + barr) - i => println(barr.value.size) +// i => println(barr1.value.size + barr2.value.size) + i => println(barr1.value.size) } - val time = (System.nanoTime - start) / 1e9 - println("BroadcastTest took " + time + " s") } } - diff --git a/src/examples/LocalFileLR.scala b/src/examples/LocalFileLR.scala index 988442755ad9095dfacd029388abf187e0ffcbd6..3d3bb60677abc26cc289d30e29399ea2e0e0d1dd 100644 --- a/src/examples/LocalFileLR.scala +++ b/src/examples/LocalFileLR.scala @@ -13,7 +13,7 @@ object LocalFileLR { } def main(args: Array[String]) { - val lines = scala.io.Source.fromPath(args(0)).getLines() + val lines = scala.io.Source.fromFile(args(0)).getLines() val points = lines.map(parsePoint _) val ITERATIONS = args(1).toInt diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index 9b8c34421f64a3abf83bb6060f7209fd0b0de378..50d8e4908a8dbfb40cc8d444658debf5cdc5822b 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -18,8 +18,8 @@ class SparkContext(master: String, frameworkName: String) { new Accumulator(initialValue, param) // TODO: Keep around a weak hash map of values to Cached versions? - def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) - //def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) + // def broadcast[T](value: T) = new CentralizedHDFSBroadcast(value, local) + def broadcast[T](value: T) = new ChainedStreamingBroadcast(value, local) def textFile(path: String) = new HdfsTextFile(this, path)