diff --git a/core/src/main/scala/spark/BitTorrentBroadcast.scala b/core/src/main/scala/spark/BitTorrentBroadcast.scala index dba9d39abfae7b49b1ee604fd76cd31de264643d..126e61dc7da2b7d148cb9f13cdc6296e55704751 100644 --- a/core/src/main/scala/spark/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/BitTorrentBroadcast.scala @@ -113,7 +113,7 @@ extends Broadcast[T] with Logging { } // In the beginning, this is the only known source to Guide - listOfSources = listOfSources + masterSource + listOfSources += masterSource // Register with the Tracker BitTorrentBroadcast.registerValue (uuid, @@ -203,7 +203,7 @@ extends Broadcast[T] with Logging { var blockID = 0 for (i <- 0 until (byteArray.length, blockSize)) { - val thisBlockSize = Math.min (blockSize, byteArray.length - i) + val thisBlockSize = math.min (blockSize, byteArray.length - i) var tempByteArray = new Array[Byte] (thisBlockSize) val hasRead = bais.read (tempByteArray, 0, thisBlockSize) @@ -268,7 +268,7 @@ extends Broadcast[T] with Logging { if (listOfSources.contains(newSourceInfo)) { listOfSources = listOfSources - newSourceInfo } - listOfSources = listOfSources + newSourceInfo + listOfSources += newSourceInfo } } @@ -435,7 +435,7 @@ extends Broadcast[T] with Logging { while (hasBlocks < totalBlocks) { var numThreadsToCreate = - Math.min (listOfSources.size, BitTorrentBroadcast.MaxTxPeers) - + math.min (listOfSources.size, BitTorrentBroadcast.MaxTxPeers) - threadPool.getActiveCount while (hasBlocks < totalBlocks && numThreadsToCreate > 0) { @@ -446,7 +446,7 @@ extends Broadcast[T] with Logging { // Add to peersNowTalking. Remove in the thread. We have to do this // ASAP, otherwise pickPeerToTalkTo picks the same peer more than once peersNowTalking.synchronized { - peersNowTalking = peersNowTalking + peerToTalkTo + peersNowTalking += peerToTalkTo } } @@ -878,7 +878,7 @@ extends Broadcast[T] with Logging { i = i - 1 } - selectedSources = selectedSources + curPeer + selectedSources += curPeer alreadyPicked.set (i) picksLeft = picksLeft - 1 diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 7040d4e147434e627e181ecda742697f73b589b6..6826c7897cae6a907ac17b88c117d2e44a336bac 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -38,7 +38,7 @@ class CacheTrackerActor extends DaemonActor with Logging { case DroppedFromCache(rddId, partition, host) => logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host)) - locs(rddId)(partition) -= host + locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host) case MemoryCacheLost(host) => logInfo("Memory cache lost on " + host) @@ -111,7 +111,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { if (cachedVal != null) { // Split is in cache, so just return its values logInfo("Found partition in cache!") - return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]]) + return cachedVal.asInstanceOf[Array[T]].iterator } else { // Mark the split as loading (unless someone else marks it first) loading.synchronized { @@ -119,7 +119,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { while (loading.contains(key)) { try {loading.wait()} catch {case _ =>} } - return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]]) + return cache.get(key).asInstanceOf[Array[T]].iterator } else { loading.add(key) } @@ -138,7 +138,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { loading.notifyAll() } future.apply() // Wait for the reply from the cache tracker - return Iterator.fromArray(array) + return array.iterator } } diff --git a/core/src/main/scala/spark/ChainedBroadcast.scala b/core/src/main/scala/spark/ChainedBroadcast.scala index 8021f5da068a81d0e186661a68a6dfbf1062617c..6f2cc3f6f0809641a1858ac63331e60835a684e9 100644 --- a/core/src/main/scala/spark/ChainedBroadcast.scala +++ b/core/src/main/scala/spark/ChainedBroadcast.scala @@ -166,7 +166,7 @@ extends Broadcast[T] with Logging { var blockID = 0 for (i <- 0 until (byteArray.length, blockSize)) { - val thisBlockSize = Math.min (blockSize, byteArray.length - i) + val thisBlockSize = math.min (blockSize, byteArray.length - i) var tempByteArray = new Array[Byte] (thisBlockSize) val hasRead = bais.read (tempByteArray, 0, thisBlockSize) diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index dbd5d451577786d127658656e5b1923cf6b2af3b..ea9e2d38a9a6b083f18c8084be52217842870d0a 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -26,7 +26,7 @@ class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( ) class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) -extends RDD[(K, Seq[Seq[_]])](rdds.first.context) with Logging { +extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { val aggr = new CoGroupAggregator override val dependencies = { @@ -45,7 +45,7 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) with Logging { } @transient val splits_ : Array[Split] = { - val firstRdd = rdds.first + val firstRdd = rdds.head val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index d49047d74a19005bcd500be90878a91e253f31d0..a970fb65262a2a0b4461ab9f000df0fead651690 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -237,7 +237,7 @@ private trait DAGScheduler extends Scheduler with Logging { if (stage.shuffleDep != None) { mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, - stage.outputLocs.map(_.first).toArray) + stage.outputLocs.map(_.head).toArray) } updateCacheLocs() val newlyRunnable = new ArrayBuffer[Stage] diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6334896cb6354bf591305ca99bc5c20ff5e7c74f..590106388a4e11ae57d4984c4509e5072a8da1cd 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -178,7 +178,7 @@ class SplitRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) + override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator } diff --git a/core/src/main/scala/spark/Stage.scala b/core/src/main/scala/spark/Stage.scala index 82b70ce60d78cfcfc081f616bae1393efc5995bc..401b33bd1629927a68c52ee5fb8ff7b5ab24dc38 100644 --- a/core/src/main/scala/spark/Stage.scala +++ b/core/src/main/scala/spark/Stage.scala @@ -22,7 +22,7 @@ class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependen def removeOutputLoc(partition: Int, host: String) { val prevList = outputLocs(partition) - val newList = prevList - host + val newList = prevList.filterNot(_ == host) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) numAvailableOutputs -= 1 diff --git a/core/src/main/scala/spark/repl/SparkCompletion.scala b/core/src/main/scala/spark/repl/SparkCompletion.scala index 9fa41736f3c281cc3c9e70ade81b587932298e1e..c6ed1860f019e54f9a2dd68831f2991f47a8c1aa 100644 --- a/core/src/main/scala/spark/repl/SparkCompletion.scala +++ b/core/src/main/scala/spark/repl/SparkCompletion.scala @@ -107,7 +107,7 @@ class SparkCompletion(val repl: SparkInterpreter) extends SparkCompletionOutput class TypeMemberCompletion(val tp: Type) extends CompletionAware with CompilerCompletion { def excludeEndsWith: List[String] = Nil def excludeStartsWith: List[String] = List("<") // <byname>, <repeated>, etc. - def excludeNames: List[String] = anyref.methodNames -- anyRefMethodsToShow ++ List("_root_") + def excludeNames: List[String] = anyref.methodNames.filterNot(anyRefMethodsToShow.contains) ++ List("_root_") def methodSignatureString(sym: Symbol) = { def asString = new MethodSymbolOutput(sym).methodString() diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala index 10360dab3dfe3b1298d9f1bc1563a87f208f1b52..10e03359c9596a49c5ebaf29b3e735e85c0394b7 100644 --- a/examples/src/main/scala/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/spark/examples/LocalALS.scala @@ -106,8 +106,8 @@ object LocalALS { val R = generateR() // Initialize m and u randomly - var ms = Array.fromFunction(_ => factory1D.random(F))(M) - var us = Array.fromFunction(_ => factory1D.random(F))(U) + var ms = Array.fill(M)(factory1D.random(F)) + var us = Array.fill(U)(factory1D.random(F)) // Iteratively update movies then users for (iter <- 1 to ITERATIONS) { diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala index cc14aa7090308f060cfaa3edbc744b3871a2206c..2e94ccbec2c6f4f573640a7cc09c0470b2227bb4 100644 --- a/examples/src/main/scala/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala @@ -27,7 +27,7 @@ object LocalFileLR { println("On iteration " + i) var gradient = Vector.zeros(D) for (p <- points) { - val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y + val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala index 3fd3f88fa8384a0d1efde0d2401b88225e0ce331..72c50091095f7448e552397aa08ad1760c822eaa 100644 --- a/examples/src/main/scala/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -18,7 +18,7 @@ object LocalLR { val x = Vector(D, _ => rand.nextGaussian + y * R) DataPoint(x, y) } - Array.fromFunction(generatePoint _)(N) + Array.tabulate(N)(generatePoint) } def main(args: Array[String]) { @@ -32,7 +32,7 @@ object LocalLR { println("On iteration " + i) var gradient = Vector.zeros(D) for (p <- data) { - val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y + val scale = (1 / (1 + math.exp(-p.y * (w dot p.x))) - 1) * p.y gradient += scale * p.x } w -= gradient diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala index 08e042037168f8f6059e1471a32e2fbb32f9b7f2..8ee14180679b21a095cc355f83f808a5b3ad39ad 100644 --- a/examples/src/main/scala/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -117,8 +117,8 @@ object SparkALS { val R = generateR() // Initialize m and u randomly - var ms = Array.fromFunction(_ => factory1D.random(F))(M) - var us = Array.fromFunction(_ => factory1D.random(F))(U) + var ms = Array.fill(M)(factory1D.random(F)) + var us = Array.fill(U)(factory1D.random(F)) // Iteratively update movies then users val Rc = spark.broadcast(R) diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index d08f5d3f015d538eaa0a3b67215e1298334052fc..faa8471824150aeb82b74005ee2866213709e64f 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -20,7 +20,7 @@ object SparkLR { val x = Vector(D, _ => rand.nextGaussian + y * R) DataPoint(x, y) } - Array.fromFunction(generatePoint _)(N) + Array.tabulate(N)(generatePoint) } def main(args: Array[String]) { diff --git a/project/plugins/SparkProjectPlugins.scala b/project/plugins/SparkProjectPlugins.scala index b07dfafcfb1a3b16db3b4627882974b507b28587..565f16082926ca35c7cce3968bacc82d6eb524b4 100644 --- a/project/plugins/SparkProjectPlugins.scala +++ b/project/plugins/SparkProjectPlugins.scala @@ -4,7 +4,7 @@ class SparkProjectPlugins(info: ProjectInfo) extends PluginDefinition(info) { val eclipse = "de.element34" % "sbt-eclipsify" % "0.7.0" val sbtIdeaRepo = "sbt-idea-repo" at "http://mpeltonen.github.com/maven/" - val sbtIdea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.2.0" + val sbtIdea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.4.0" val codaRepo = "Coda Hale's Repository" at "http://repo.codahale.com/" val assemblySBT = "com.codahale" % "assembly-sbt" % "0.1.1"