diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a1af63fa4a391b4e2a73f310a2009b7a6c5ce404..5ceac28fe7afb27fa9c196275deb72b3753da17e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -81,7 +81,7 @@ class SparkEnv private[spark] ( // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // down, but let's call it anyway in case it gets fixed in a later release // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. - //actorSystem.awaitTermination() + // actorSystem.awaitTermination() } private[spark] diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 3cd71213769b7e4ec8a9eb9de79a631c50f269e4..2595c15104e87e9ad4c2de847a201ef536414280 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -167,7 +167,7 @@ extends Logging { private var initialized = false private var conf: SparkConf = null def initialize(_isDriver: Boolean, conf: SparkConf) { - TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests + TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests synchronized { if (!initialized) { initialized = true diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index a73b459c3cea1ed2f1505d881d85e48b1844d33e..9a7a113c9571521c6e9a51719144c4d9bcc5fbce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors! // This is unfortunate, but for now we just comment it out. workerActorSystems.foreach(_.shutdown()) - //workerActorSystems.foreach(_.awaitTermination()) + // workerActorSystems.foreach(_.awaitTermination()) masterActorSystems.foreach(_.shutdown()) - //masterActorSystems.foreach(_.awaitTermination()) + // masterActorSystems.foreach(_.awaitTermination()) masterActorSystems.clear() workerActorSystems.clear() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index a730fe1f599af4ca03e5f14cb6134741c0b2dcba..4433a2ec29be6de3ae84c8d1e695ab9c37590863 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ private[spark] trait LeaderElectionAgent extends Actor { - //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring. + // TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring. val masterActor: ActorRef } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 13e2e292428b44dd85d594e2b48d48fd6a1af1ab..aecb069e4202b06daf71be72acbef86d6c014c66 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -275,7 +275,6 @@ private[spark] class Executor( // have left some weird state around depending on when the exception was thrown, but on // the other hand, maybe we could detect that when future tasks fail and exit then. logError("Exception in task ID " + taskId, t) - //System.exit(1) } } finally { // TODO: Unregister shuffle memory only for ResultTask diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 6883a544945984505bb9ebf330ff135f106d6605..3e3e18c3537d05e1569b8fca2607498e46885f91 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi } def initialize() { - //Add default properties in case there's no properties file + // Add default properties in case there's no properties file setDefaultProperties(properties) // If spark.metrics.conf is not set, try to get file in class path diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 8fd9c2b87d2564a7811c8860ec08cccaa1cf4b2a..16bd00fd189ff33579fdfe1b3695ba0bfa4ed5a1 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, channel.socket.setTcpNoDelay(true) channel.socket.setReuseAddress(true) channel.socket.setKeepAlive(true) - /*channel.socket.setReceiveBufferSize(32768) */ + /* channel.socket.setReceiveBufferSize(32768) */ @volatile private var closed = false var onCloseCallback: Connection => Unit = null @@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, private class Outbox { val messages = new Queue[Message]() - val defaultChunkSize = 65536 //32768 //16384 + val defaultChunkSize = 65536 var nextMessageToBeUsed = 0 def addMessage(message: Message) { messages.synchronized{ - /*messages += message*/ + /* messages += message*/ messages.enqueue(message) logDebug("Added [" + message + "] to outbox for sending to " + "[" + getRemoteConnectionManagerId() + "]") @@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, def getChunk(): Option[MessageChunk] = { messages.synchronized { while (!messages.isEmpty) { - /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ - /*val message = messages(nextMessageToBeUsed)*/ + /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ + /* val message = messages(nextMessageToBeUsed)*/ val message = messages.dequeue val chunk = message.getChunkForSending(defaultChunkSize) if (chunk.isDefined) { @@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, val currentBuffers = new ArrayBuffer[ByteBuffer]() - /*channel.socket.setSendBufferSize(256 * 1024)*/ + /* channel.socket.setSendBufferSize(256 * 1024)*/ override def getRemoteAddress() = address @@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } case None => { // changeConnectionKeyInterest(0) - /*key.interestOps(0)*/ + /* key.interestOps(0)*/ return false } } @@ -540,10 +540,10 @@ private[spark] class ReceivingConnection( return false } - /*logDebug("Read " + bytesRead + " bytes for the buffer")*/ + /* logDebug("Read " + bytesRead + " bytes for the buffer")*/ if (currentChunk.buffer.remaining == 0) { - /*println("Filled buffer at " + System.currentTimeMillis)*/ + /* println("Filled buffer at " + System.currentTimeMillis)*/ val bufferMessage = inbox.getMessageForChunk(currentChunk).get if (bufferMessage.isCompletelyReceived) { bufferMessage.flip diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index a75130cba2a2e17f74fae2c5a0b27e06d1e13e3b..2682f9d0ed7f0ed639f2a9afaf6cc6233853c991 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -505,7 +505,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } } handleMessageExecutor.execute(runnable) - /*handleMessage(connection, message)*/ + /* handleMessage(connection, message)*/ } private def handleClientAuthentication( @@ -733,7 +733,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, logTrace("Sending Security [" + message + "] to [" + connManagerId + "]") val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection()) - //send security message until going connection has been authenticated + // send security message until going connection has been authenticated connection.send(message) wakeupSelector() @@ -859,14 +859,14 @@ private[spark] object ConnectionManager { None }) - /*testSequentialSending(manager)*/ - /*System.gc()*/ + /* testSequentialSending(manager)*/ + /* System.gc()*/ - /*testParallelSending(manager)*/ - /*System.gc()*/ + /* testParallelSending(manager)*/ + /* System.gc()*/ - /*testParallelDecreasingSending(manager)*/ - /*System.gc()*/ + /* testParallelDecreasingSending(manager)*/ + /* System.gc()*/ testContinuousSending(manager) System.gc() @@ -948,7 +948,7 @@ private[spark] object ConnectionManager { val ms = finishTime - startTime val tput = mb * 1000.0 / ms println("--------------------------") - /*println("Started at " + startTime + ", finished at " + finishTime) */ + /* println("Started at " + startTime + ", finished at " + finishTime) */ println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)") println("--------------------------") println() diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala index 35f64134b073aa3e16f338d3e173fb8eb6a7b490..e5745d7daa1538ce0e302c392c0ed8202eefa7d1 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala @@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{ val slaves = slavesFile.mkString.split("\n") slavesFile.close() - /*println("Slaves")*/ - /*slaves.foreach(println)*/ + /* println("Slaves")*/ + /* slaves.foreach(println)*/ val tasknum = if (args.length > 2) args(2).toInt else slaves.length val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 val count = if (args.length > 4) args(4).toInt else 3 diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala index 3c09a713c6fe08f755dd1bbb357c5f0baaf25f0e..17fd931c9f075638af89fa5cd0e414be0eff70ab 100644 --- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala @@ -27,7 +27,7 @@ private[spark] object ReceiverTest { println("Started connection manager with id = " + manager.id) manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { - /*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/ + /* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/ val buffer = ByteBuffer.wrap("response".getBytes) Some(Message.createBufferMessage(buffer, msg.id)) }) diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala index aac2c24a46faa808815d4baf499c61598fd32e8b..905eddfbb9450422975d0fcb2d4934cfd11a9d92 100644 --- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala +++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala @@ -50,7 +50,7 @@ private[spark] object SenderTest { (0 until count).foreach(i => { val dataMessage = Message.createBufferMessage(buffer.duplicate) val startTime = System.currentTimeMillis - /*println("Started timer at " + startTime)*/ + /* println("Started timer at " + startTime)*/ val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) .map { response => val buffer = response.asInstanceOf[BufferMessage].buffers(0) diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala index f9082ffb9141ab273417ddc51f647e2440b0249b..4164e81d3a8aea897d9c5ab6d62366effa906dda 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala @@ -32,7 +32,7 @@ private[spark] class FileHeader ( buf.writeInt(fileLen) buf.writeInt(blockId.name.length) blockId.name.foreach((x: Char) => buf.writeByte(x)) - //padding the rest of header + // padding the rest of header if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) { buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes) } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 77c558ac46f6f647124d312a50e2ecc2ebd93534..4fce47e1ee8de42276f5db09262f9e2fc06da0f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -753,7 +753,7 @@ class DAGScheduler( val properties = if (stageIdToActiveJob.contains(jobId)) { stageIdToActiveJob(stage.jobId).properties } else { - //this stage will be assigned to "default" pool + // this stage will be assigned to "default" pool null } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 990e01a3e79592685c9c52b05481cba13bc79542..7bfc30b4208a31cc2b8063b289f6eac65004fc9a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A properties += ((key, value)) } } - //TODO (prashant) send conf instead of properties + // TODO (prashant) send conf instead of properties driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index bcfc39146a61e3ef6f20a7e409b75802b0053e6d..2fbbda5b76c74b42e7d87020c6fa4d59d7ec8765 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -284,7 +284,7 @@ object BlockFetcherIterator { } } catch { case x: InterruptedException => logInfo("Copier Interrupted") - //case _ => throw new SparkException("Exception Throw in Shuffle Copier") + // case _ => throw new SparkException("Exception Throw in Shuffle Copier") } } } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index a8d20ee332355bdc576fd5b470b0920f9d606489..cdbbc65292188813fb78a4f0c25f68360812a64f 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging { accessedFields(cls) = Set[String]() for (cls <- func.getClass :: innerClasses) getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0) - //logInfo("accessedFields: " + accessedFields) + // logInfo("accessedFields: " + accessedFields) val inInterpreter = { try { @@ -139,13 +139,13 @@ private[spark] object ClosureCleaner extends Logging { val field = cls.getDeclaredField(fieldName) field.setAccessible(true) val value = field.get(obj) - //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value); + // logInfo("1: Setting " + fieldName + " on " + cls + " to " + value); field.set(outer, value) } } if (outer != null) { - //logInfo("2: Setting $outer on " + func.getClass + " to " + outer); + // logInfo("2: Setting $outer on " + func.getClass + " to " + outer); val field = func.getClass.getDeclaredField("$outer") field.setAccessible(true) field.set(func, outer) @@ -153,7 +153,7 @@ private[spark] object ClosureCleaner extends Logging { } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { - //logInfo("Creating a " + cls + " with outer = " + outer) + // logInfo("Creating a " + cls + " with outer = " + outer) if (!inInterpreter) { // This is a bona fide closure class, whose constructor has no effects // other than to set its fields, so use its constructor @@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging { val newCtor = rf.newConstructorForSerialization(cls, parentCtor) val obj = newCtor.newInstance().asInstanceOf[AnyRef] if (outer != null) { - //logInfo("3: Setting $outer on " + cls + " to " + outer); + // logInfo("3: Setting $outer on " + cls + " to " + outer); val field = cls.getDeclaredField("$outer") field.setAccessible(true) field.set(obj, outer) diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index c539d2f708f95347475f429ca429660c6f40cfc6..4188a869c13da5aabf4649d6dbb35d66465c1233 100644 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl( if (isFatalError(cause) && !settings.JvmExitOnFatalError) { log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) - //shutdown() //TODO make it configurable + // shutdown() //TODO make it configurable } else { fallbackHandler.uncaughtException(thread, cause) } diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala index 2c1a6f8fd0a449b5ea970658221cfc8818a024ff..a898824cff0ca3327d294d6c8350b1c5557b1040 100644 --- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala +++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala @@ -24,8 +24,8 @@ package org.apache.spark.util * @param _1 Element 1 of this MutablePair * @param _2 Element 2 of this MutablePair */ -case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1, - @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2] +case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1, + @specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2] (var _1: T1, var _2: T2) extends Product2[T1, T2] { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 6c73ea6949dd24413acdc77cf116b5c1e3e5eabf..4e7c34e6d1ada1d7a214c00feaa6312f81fb8d4f 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -66,7 +66,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte test ("add value to collection accumulators") { val maxI = 1000 - for (nThreads <- List(1, 10)) { //test single & multi-threaded + for (nThreads <- List(1, 10)) { // test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) val d = sc.parallelize(1 to maxI) @@ -83,7 +83,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte test ("value not readable in tasks") { val maxI = 1000 - for (nThreads <- List(1, 10)) { //test single & multi-threaded + for (nThreads <- List(1, 10)) { // test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) val d = sc.parallelize(1 to maxI) @@ -124,7 +124,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte test ("localValue readable in tasks") { val maxI = 1000 - for (nThreads <- List(1, 10)) { //test single & multi-threaded + for (nThreads <- List(1, 10)) { // test single & multi-threaded sc = new SparkContext("local[" + nThreads + "]", "test") val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]()) val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet} diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index d2e29f20f0b08b3ac4043f0d7563ce45f099a207..d2555b7c052c11d6c9a458ba1563a498c0ff6d00 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -432,7 +432,6 @@ object CheckpointSuite { // This is a custom cogroup function that does not use mapValues like // the PairRDDFunctions.cogroup() def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = { - //println("First = " + first + ", second = " + second) new CoGroupedRDD[K]( Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]), part diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 996db70809320f025eed6fab2dc15578701ca43f..7c30626a0c421dcd215e7774f7bd283a1385cff0 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -146,7 +146,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array")) // We can't catch all usages of arrays, since they might occur inside other collections: - //assert(fails { arrPairs.distinct() }) + // assert(fails { arrPairs.distinct() }) assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a25ce35736146547bca4ec3f08218c9a96c60f30..7c843772bc2e0ea80cf96c1d08a9287eea670891 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -111,7 +111,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) - //just to make sure some of the tasks take a noticeable amount of time + // just to make sure some of the tasks take a noticeable amount of time val w = {i:Int => if (i == 0) Thread.sleep(100) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index eb8f5915605debe6760de904bc837b9f39a17f77..616214fb5e3a6be44e1ab6282cf95483838c97ae 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -39,7 +39,7 @@ class UtilsSuite extends FunSuite { } test("copyStream") { - //input array initialization + // input array initialization val bytes = Array.ofDim[Byte](9000) Random.nextBytes(bytes) diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index c8ecbb8e41a8689a2d10421cae74f2ceb45f7f70..0095cb8425456045d1287b428b7312fca636daca 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -53,7 +53,6 @@ object LocalALS { for (i <- 0 until M; j <- 0 until U) { r.set(i, j, blas.ddot(ms(i), us(j))) } - //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) sqrt(sumSqs / (M * U)) diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 73b0e216cac9875300ea69c97587a4f8ed270f95..1fdb324b89f3ac43bc9a93b09a6896bfee49fdd3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -61,7 +61,7 @@ object SimpleSkewedGroupByTest { println("RESULT: " + pairs1.groupByKey(numReducers).count) // Print how many keys each reducer got (for debugging) - //println("RESULT: " + pairs1.groupByKey(numReducers) + // println("RESULT: " + pairs1.groupByKey(numReducers) // .map{case (k,v) => (k, v.size)} // .collectAsMap) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index ce4b3c8451e00b3913b77cbf1c3360190f32754a..f59ab7e7cc24a6ab333eef7dac14fc6b41226812 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -54,7 +54,6 @@ object SparkALS { for (i <- 0 until M; j <- 0 until U) { r.set(i, j, blas.ddot(ms(i), us(j))) } - //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) sqrt(sumSqs / (M * U)) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index cf1fc3e808c766d682d685d493c11af98967788a..e698b9bf376e169a328bdff5d619a7b4b4768ed3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -34,8 +34,6 @@ object SparkHdfsLR { case class DataPoint(x: Vector, y: Double) def parsePoint(line: String): DataPoint = { - //val nums = line.split(' ').map(_.toDouble) - //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) val tok = new java.util.StringTokenizer(line, " ") var y = tok.nextToken.toDouble var x = new Array[Double](D) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 62d3a526155840f53137d62a9138a4725bf51991..a22e64ca3ce457431a6f3375ac63a5aef2050c60 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -168,7 +168,7 @@ object ActorWordCount { Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( host, port.toInt))), "SampleReceiver") - //compute wordcount + // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() ssc.start() diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 35be7ffa1e872e43abfbdf0b7df6eb45f56310eb..35f8f885f8f0efbcafc0ab73bc02eda2d043dd1e 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -88,7 +88,7 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator - //For this stream, a zeroMQ publisher should be running. + // For this stream, a zeroMQ publisher should be running. val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index 6acba25f44c0a5278cff9d1519d50f54455347df..a538c38dc4d6fdce9670c9de0ace9de7cd8f70c7 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -44,7 +44,7 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, case m: ZMQMessage => logDebug("Received message for:" + m.frame(0)) - //We ignore first frame for processing as it is the topic + // We ignore first frame for processing as it is the topic val bytes = m.frames.tail pushBlock(bytesToObjects(bytes)) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala index fea43c3b2bbf1c01d8cd71d588a5a1cf1d9437a6..dfc6a801587d229d7f2e66078eaa87a8a395e80f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -27,12 +27,12 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { /** * The source vertex attribute */ - var srcAttr: VD = _ //nullValue[VD] + var srcAttr: VD = _ // nullValue[VD] /** * The destination vertex attribute */ - var dstAttr: VD = _ //nullValue[VD] + var dstAttr: VD = _ // nullValue[VD] /** * Set the edge properties of this triplet. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 43ac11d8957f67eff8ff0aff85069dc1c50b7cf2..c2b510a31ee3fdb0523cef2dd91432395fad748d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -190,9 +190,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) } - ////////////////////////////////////////////////////////////////////////////////////////////////// + // /////////////////////////////////////////////////////////////////////////////////////////////// // Lower level transformation methods - ////////////////////////////////////////////////////////////////////////////////////////////////// + // /////////////////////////////////////////////////////////////////////////////////////////////// override def mapReduceTriplets[A: ClassTag]( mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala index fe6fe76defdc51be17acb6924a4387c719b9d8e5..bebe3740bc6c043559497f08bbde82c9df851321 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -45,7 +45,7 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( * @param data value to send */ private[graphx] -class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T]( +class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T]( @transient var partition: PartitionID, var data: T) extends Product2[PartitionID, T] with Serializable { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index 34a145e01818feedafdfe5000e9f1c298cd2fc60..2f2c524df6394129f9ef676d5ca17b90d9798f07 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -298,7 +298,6 @@ abstract class ShuffleSerializationStream(s: OutputStream) extends Serialization s.write(v.toInt) } - //def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v)) def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v)) override def flush(): Unit = s.flush() @@ -391,7 +390,6 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat (s.read() & 0xFF) } - //def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong()) def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong()) override def close(): Unit = s.close() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala index 014a7335f85cc3a57a262345c3a9b337b632aa2a..087b1156f690b8588a6ec6cc1b1bf2881a31f5e1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala @@ -65,7 +65,7 @@ private[graphx] object BytecodeUtils { val finder = new MethodInvocationFinder(c.getName, m) getClassReader(c).accept(finder, 0) for (classMethod <- finder.methodsInvoked) { - //println(classMethod) + // println(classMethod) if (classMethod._1 == targetClass && classMethod._2 == targetMethod) { return true } else if (!seen.contains(classMethod)) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index f841846c0e5100ee38f09c9ae1342147ab35b1bb..a3c8de3f9068f3657420e350abc6fd22559b2bc7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -123,7 +123,7 @@ object GraphGenerators { * the dimensions of the adjacency matrix */ private def addEdge(numVertices: Int): Edge[Int] = { - //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0) + // val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0) val v = math.round(numVertices.toFloat/2.0).toInt val (src, dst) = chooseCell(v, v, v) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9e269e655134115a9f1d79d95a2e36f8f48a4651..2549bc9710f1fcc0fb6bebe9506f0794652c615d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -17,7 +17,7 @@ import sbt._ import sbt.Classpaths.publishTask -import Keys._ +import sbt.Keys._ import sbtassembly.Plugin._ import AssemblyKeys._ import scala.util.Properties @@ -27,7 +27,7 @@ import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact import scala.collection.JavaConversions._ // For Sonatype publishing -//import com.jsuereth.pgp.sbtplugin.PgpKeys._ +// import com.jsuereth.pgp.sbtplugin.PgpKeys._ object SparkBuild extends Build { val SPARK_VERSION = "1.0.0-SNAPSHOT" @@ -200,7 +200,7 @@ object SparkBuild extends Build { publishMavenStyle := true, - //useGpg in Global := true, + // useGpg in Global := true, pomExtra := ( <parent> diff --git a/project/plugins.sbt b/project/plugins.sbt index 4ff6f67af45c0084ca7db1ddb6ac62f39f4e3362..5aa8a1ec2409b2a100407330e296b9b77f46a52c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -22,3 +22,4 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0") + diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala new file mode 100644 index 0000000000000000000000000000000000000000..43361aa2b4c412950b3f350705b7d93708fc77d9 --- /dev/null +++ b/project/project/SparkPluginBuild.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import sbt.Keys._ + +/** + * This plugin project is there to define new scala style rules for spark. This is + * a plugin project so that this gets compiled first and is put on the classpath and + * becomes available for scalastyle sbt plugin. + */ +object SparkPluginDef extends Build { + lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle) + lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings) + val sparkVersion = "1.0.0-SNAPSHOT" + // There is actually no need to publish this artifact. + def styleSettings = Defaults.defaultSettings ++ Seq ( + name := "spark-style", + organization := "org.apache.spark", + version := sparkVersion, + scalaVersion := "2.10.3", + scalacOptions := Seq("-unchecked", "-deprecation"), + libraryDependencies ++= Dependencies.scalaStyle, + sbtPlugin := true + ) + + object Dependencies { + val scalaStyle = Seq("org.scalastyle" %% "scalastyle" % "0.4.0") + } +} diff --git a/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala new file mode 100644 index 0000000000000000000000000000000000000000..2f3c1a182814d5e43a8d6c5349cc16447b7b4f40 --- /dev/null +++ b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.scalastyle + +import java.util.regex.Pattern + +import org.scalastyle.{PositionError, ScalariformChecker, ScalastyleError} +import scalariform.lexer.{MultiLineComment, ScalaDocComment, SingleLineComment, Token} +import scalariform.parser.CompilationUnit + +class SparkSpaceAfterCommentStartChecker extends ScalariformChecker { + val errorKey: String = "insert.a.single.space.after.comment.start" + + private def multiLineCommentRegex(comment: Token) = + Pattern.compile( """/\*\S+.*""", Pattern.DOTALL).matcher(comment.text.trim).matches() + + private def scalaDocPatternRegex(comment: Token) = + Pattern.compile( """/\*\*\S+.*""", Pattern.DOTALL).matcher(comment.text.trim).matches() + + private def singleLineCommentRegex(comment: Token): Boolean = + comment.text.trim.matches( """//\S+.*""") && !comment.text.trim.matches( """///+""") + + override def verify(ast: CompilationUnit): List[ScalastyleError] = { + ast.tokens + .filter(hasComment) + .map { + _.associatedWhitespaceAndComments.comments.map { + case x: SingleLineComment if singleLineCommentRegex(x.token) => Some(x.token.offset) + case x: MultiLineComment if multiLineCommentRegex(x.token) => Some(x.token.offset) + case x: ScalaDocComment if scalaDocPatternRegex(x.token) => Some(x.token.offset) + case _ => None + }.flatten + }.flatten.map(PositionError(_)) + } + + + private def hasComment(x: Token) = + x.associatedWhitespaceAndComments != null && !x.associatedWhitespaceAndComments.comments.isEmpty + +} diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index ee972887feda61adc82b9536c43fdde87c45dbf2..bf73800388ebfb18d0426d7056907494484662be 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -124,8 +124,8 @@ extends ClassVisitor(ASM4, cv) { mv.visitVarInsn(ALOAD, 0) // load this mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V") mv.visitVarInsn(ALOAD, 0) // load this - //val classType = className.replace('.', '/') - //mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";") + // val classType = className.replace('.', '/') + // mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";") mv.visitInsn(RETURN) mv.visitMaxs(-1, -1) // stack size and local vars will be auto-computed mv.visitEnd() diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 90a96ad38381e2ef6363a7dc92a4dc07bfa377f9..fa2f1a88c4eb5e169b2a1592c14d387f69c8a1fc 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -834,7 +834,7 @@ import org.apache.spark.util.Utils } ((pos, msg)) :: loop(filtered) } - //PRASHANT: This leads to a NoSuchMethodError for _.warnings. Yet to figure out its purpose. + // PRASHANT: This leads to a NoSuchMethodError for _.warnings. Yet to figure out its purpose. // val warnings = loop(run.allConditionalWarnings flatMap (_.warnings)) // if (warnings.nonEmpty) // mostRecentWarnings = warnings diff --git a/scalastyle-config.xml b/scalastyle-config.xml index ee968c53b3e4b7b456a3c025a00e4b69045d8a74..76ba1ecca33abfae4ecdebeb1955428bd67e4731 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -140,4 +140,5 @@ <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> --> <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check> <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check> + <check level="error" class="org.apache.spark.scalastyle.SparkSpaceAfterCommentStartChecker" enabled="true"></check> </scalastyle> diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index f4b61381f9a27883339281a3b2a4f11f35ece2cd..b70ec897e43e73df864b67d463d0476b1c795489 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -662,7 +662,7 @@ object HiveQl { // worth the number of hacks that will be required to implement it. Namely, we need to add // some sort of mapped star expansion that would expand all child output row to be similarly // named output expressions where some aggregate expression has been applied (i.e. First). - ??? /// Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult) + ??? // Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult) case Token(allJoinTokens(joinToken), relation1 :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index ca5311344615f7dd5901f0c404c00aa3e500f8cc..0da5eb754cb3fe1b5f7f36dbded6fc7d08a4a827 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -94,7 +94,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) - //logDebug("Table input: %s".format(tablePath)) + // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index fde46705d89fb1f05a27e250cd998555bbf1d5a6..d3339063cc079223e674a5cffb202d11a6869acd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -153,7 +153,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") - //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + + // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + // " is very low") assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0dc6704603f8222b8278fa3d74a1ad3a15958326..72ad0bae75bfbd8392bccbfc16f4aefd73e6aa00 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -128,7 +128,6 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } catch { case ie: InterruptedException => logInfo("Receiving thread interrupted") - //println("Receiving thread interrupted") case e: Exception => stopOnError(e) } @@ -142,7 +141,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging def stop() { receivingThread.interrupt() onStop() - //TODO: terminate the actor + // TODO: terminate the actor } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index ca0a8ae47864d21f13d56268345af398ee2e29a0..b334d68bf99108ea2f121e3f39e397e8c9483c41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -78,7 +78,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( override def checkpoint(interval: Duration): DStream[(K, V)] = { super.checkpoint(interval) - //reducedStream.checkpoint(interval) + // reducedStream.checkpoint(interval) this } @@ -128,7 +128,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( // Cogroup the reduced RDDs and merge the reduced values val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner) - //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + // val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ val numOldValues = oldRDDs.size val numNewValues = newRDDs.size diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 9d8889b6553566c1249400d0dae09ce1e3146e00..5f7d3ba26c65694aeb5a7fb640e1b6cd9fd19291 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -64,7 +64,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( } val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime) Some(stateRDD) } case None => { // If parent RDD does not exist @@ -97,11 +96,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) - //logDebug("Generating state RDD for time " + validTime + " (first)") + // logDebug("Generating state RDD for time " + validTime + " (first)") Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! - //logDebug("Not generating state RDD (no previous state, no parent)") + // logDebug("Not generating state RDD (no previous state, no parent)") None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index e4fa163f2e069b8889405e90d5ba77340a675cbe..cad68e248ab2933e8b51db78c78da547b655a407 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -126,7 +126,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { receiverInfo -= streamId logError("De-registered receiver for network stream " + streamId + " with message " + msg) - //TODO: Do something about the corresponding NetworkInputDStream + // TODO: Do something about the corresponding NetworkInputDStream } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 0784e562ac71937dabf0c066e2cb8862f5dda4dc..25739956cb889333e30f20c9925801e4286f63a9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -252,7 +252,7 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() // Create files and advance manual clock to process them - //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 74e73ebb342fe446eaac610001e7c362819997af..7df206241beb6a757f7d0fff588a50de8bff9bcb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -154,7 +154,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", - StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope + // Had to pass the local value of port to prevent from closing over entire scope + StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 57e5761cba8967af6ffb8c579ebd0d396cba7c0b..6568003bf10080776e386ebe696fe0b8006fad40 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -139,7 +139,6 @@ trait ClientBase extends Logging { } else if (srcHost != null && dstHost == null) { return false } - //check for ports if (srcUri.getPort() != dstUri.getPort()) { false } else { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 68cda0f1c9f8b5844d2a7fcb281654c3d95f76d7..9b7f1fca96c6d1eb4c94785db76da4a606b4e09a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -157,7 +157,7 @@ class ClientDistributedCacheManager() extends Logging { def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { val fs = FileSystem.get(uri, conf) val current = new Path(uri.getPath()) - //the leaf level file should be readable by others + // the leaf level file should be readable by others if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { return false } @@ -177,7 +177,7 @@ class ClientDistributedCacheManager() extends Logging { statCache: Map[URI, FileStatus]): Boolean = { var current = path while (current != null) { - //the subdirs in the path should have execute permissions for others + // the subdirs in the path should have execute permissions for others if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { return false } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index 458df4fa3cd9943fd67504b83566dcb1a1776cc7..80b57d1355a3a883174a9ed928fa85e25c8058b6 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -99,7 +99,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) - //add another one and verify both there and order correct + // add another one and verify both there and order correct val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", null, new Path("/tmp/testing2")) val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")