From 60abc252545ec7a5d59957a32e764cd18f6c16b4 Mon Sep 17 00:00:00 2001
From: Prashant Sharma <prashant.s@imaginea.com>
Date: Fri, 28 Mar 2014 00:21:49 -0700
Subject: [PATCH] SPARK-1096, a space after comment start style checker.

Author: Prashant Sharma <prashant.s@imaginea.com>

Closes #124 from ScrapCodes/SPARK-1096/scalastyle-comment-check and squashes the following commits:

214135a [Prashant Sharma] Review feedback.
5eba88c [Prashant Sharma] Fixed style checks for ///+ comments.
e54b2f8 [Prashant Sharma] improved message, work around.
83e7144 [Prashant Sharma] removed dependency on scalastyle in plugin, since scalastyle sbt plugin already depends on the right version. Incase we update the plugin we will have to adjust our spark-style project to depend on right scalastyle version.
810a1d6 [Prashant Sharma] SPARK-1096, a space after comment style checker.
ba33193 [Prashant Sharma] scala style as a project
---
 .../scala/org/apache/spark/SparkEnv.scala     |  2 +-
 .../spark/broadcast/TorrentBroadcast.scala    |  2 +-
 .../spark/deploy/LocalSparkCluster.scala      |  4 +-
 .../deploy/master/LeaderElectionAgent.scala   |  2 +-
 .../org/apache/spark/executor/Executor.scala  |  1 -
 .../apache/spark/metrics/MetricsConfig.scala  |  2 +-
 .../org/apache/spark/network/Connection.scala | 18 +++---
 .../spark/network/ConnectionManager.scala     | 18 +++---
 .../spark/network/ConnectionManagerTest.scala |  4 +-
 .../apache/spark/network/ReceiverTest.scala   |  2 +-
 .../org/apache/spark/network/SenderTest.scala |  2 +-
 .../spark/network/netty/FileHeader.scala      |  2 +-
 .../apache/spark/scheduler/DAGScheduler.scala |  2 +-
 .../CoarseGrainedSchedulerBackend.scala       |  2 +-
 .../spark/storage/BlockFetcherIterator.scala  |  2 +-
 .../apache/spark/util/ClosureCleaner.scala    | 10 ++--
 .../util/IndestructibleActorSystem.scala      |  2 +-
 .../org/apache/spark/util/MutablePair.scala   |  4 +-
 .../org/apache/spark/AccumulatorSuite.scala   |  6 +-
 .../org/apache/spark/CheckpointSuite.scala    |  1 -
 .../org/apache/spark/PartitioningSuite.scala  |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala  |  2 +-
 .../org/apache/spark/util/UtilsSuite.scala    |  2 +-
 .../org/apache/spark/examples/LocalALS.scala  |  1 -
 .../examples/SimpleSkewedGroupByTest.scala    |  2 +-
 .../org/apache/spark/examples/SparkALS.scala  |  1 -
 .../apache/spark/examples/SparkHdfsLR.scala   |  2 -
 .../streaming/examples/ActorWordCount.scala   |  2 +-
 .../streaming/examples/ZeroMQWordCount.scala  |  2 +-
 .../streaming/zeromq/ZeroMQReceiver.scala     |  2 +-
 .../org/apache/spark/graphx/EdgeTriplet.scala |  4 +-
 .../apache/spark/graphx/impl/GraphImpl.scala  |  4 +-
 .../graphx/impl/MessageToPartition.scala      |  2 +-
 .../spark/graphx/impl/Serializers.scala       |  2 -
 .../spark/graphx/util/BytecodeUtils.scala     |  2 +-
 .../spark/graphx/util/GraphGenerators.scala   |  2 +-
 project/SparkBuild.scala                      |  6 +-
 project/plugins.sbt                           |  1 +
 project/project/SparkPluginBuild.scala        | 44 +++++++++++++++
 .../SparkSpaceAfterCommentStyleCheck.scala    | 56 +++++++++++++++++++
 .../spark/repl/ExecutorClassLoader.scala      |  4 +-
 .../org/apache/spark/repl/SparkIMain.scala    |  2 +-
 scalastyle-config.xml                         |  1 +
 .../org/apache/spark/sql/hive/HiveQl.scala    |  2 +-
 .../apache/spark/sql/hive/TableReader.scala   |  2 +-
 .../apache/spark/streaming/DStreamGraph.scala |  2 +-
 .../dstream/NetworkInputDStream.scala         |  3 +-
 .../dstream/ReducedWindowedDStream.scala      |  4 +-
 .../streaming/dstream/StateDStream.scala      |  5 +-
 .../scheduler/NetworkInputTracker.scala       |  2 +-
 .../spark/streaming/CheckpointSuite.scala     |  2 +-
 .../spark/streaming/InputStreamsSuite.scala   |  3 +-
 .../apache/spark/deploy/yarn/ClientBase.scala |  1 -
 .../yarn/ClientDistributedCacheManager.scala  |  4 +-
 .../ClientDistributedCacheManagerSuite.scala  |  2 +-
 55 files changed, 180 insertions(+), 88 deletions(-)
 create mode 100644 project/project/SparkPluginBuild.scala
 create mode 100644 project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index a1af63fa4a..5ceac28fe7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -81,7 +81,7 @@ class SparkEnv private[spark] (
     // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
     // down, but let's call it anyway in case it gets fixed in a later release
     // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
-    //actorSystem.awaitTermination()
+    // actorSystem.awaitTermination()
   }
 
   private[spark]
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 3cd7121376..2595c15104 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -167,7 +167,7 @@ extends Logging {
   private var initialized = false
   private var conf: SparkConf = null
   def initialize(_isDriver: Boolean, conf: SparkConf) {
-    TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
+    TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
     synchronized {
       if (!initialized) {
         initialized = true
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index a73b459c3c..9a7a113c95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
     // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
     //       This is unfortunate, but for now we just comment it out.
     workerActorSystems.foreach(_.shutdown())
-    //workerActorSystems.foreach(_.awaitTermination())
+    // workerActorSystems.foreach(_.awaitTermination())
     masterActorSystems.foreach(_.shutdown())
-    //masterActorSystems.foreach(_.awaitTermination())
+    // masterActorSystems.foreach(_.awaitTermination())
     masterActorSystems.clear()
     workerActorSystems.clear()
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index a730fe1f59..4433a2ec29 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
  * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
  */
 private[spark] trait LeaderElectionAgent extends Actor {
-  //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
+  // TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
   val masterActor: ActorRef
 }
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 13e2e29242..aecb069e42 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -275,7 +275,6 @@ private[spark] class Executor(
           // have left some weird state around depending on when the exception was thrown, but on
           // the other hand, maybe we could detect that when future tasks fail and exit then.
           logError("Exception in task ID " + taskId, t)
-          //System.exit(1)
         }
       } finally {
         // TODO: Unregister shuffle memory only for ResultTask
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 6883a54494..3e3e18c353 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
   }
 
   def initialize() {
-    //Add default properties in case there's no properties file
+    // Add default properties in case there's no properties file
     setDefaultProperties(properties)
 
     // If spark.metrics.conf is not set, try to get file in class path
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 8fd9c2b87d..16bd00fd18 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
   channel.socket.setTcpNoDelay(true)
   channel.socket.setReuseAddress(true)
   channel.socket.setKeepAlive(true)
-  /*channel.socket.setReceiveBufferSize(32768) */
+  /* channel.socket.setReceiveBufferSize(32768) */
 
   @volatile private var closed = false
   var onCloseCallback: Connection => Unit = null
@@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
 
   private class Outbox {
     val messages = new Queue[Message]()
-    val defaultChunkSize = 65536  //32768 //16384
+    val defaultChunkSize = 65536
     var nextMessageToBeUsed = 0
 
     def addMessage(message: Message) {
       messages.synchronized{
-        /*messages += message*/
+        /* messages += message*/
         messages.enqueue(message)
         logDebug("Added [" + message + "] to outbox for sending to " +
           "[" + getRemoteConnectionManagerId() + "]")
@@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
     def getChunk(): Option[MessageChunk] = {
       messages.synchronized {
         while (!messages.isEmpty) {
-          /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
-          /*val message = messages(nextMessageToBeUsed)*/
+          /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
+          /* val message = messages(nextMessageToBeUsed)*/
           val message = messages.dequeue
           val chunk = message.getChunkForSending(defaultChunkSize)
           if (chunk.isDefined) {
@@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
 
   val currentBuffers = new ArrayBuffer[ByteBuffer]()
 
-  /*channel.socket.setSendBufferSize(256 * 1024)*/
+  /* channel.socket.setSendBufferSize(256 * 1024)*/
 
   override def getRemoteAddress() = address
 
@@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
               }
               case None => {
                 // changeConnectionKeyInterest(0)
-                /*key.interestOps(0)*/
+                /* key.interestOps(0)*/
                 return false
               }
             }
@@ -540,10 +540,10 @@ private[spark] class ReceivingConnection(
           return false
         }
 
-        /*logDebug("Read " + bytesRead + " bytes for the buffer")*/
+        /* logDebug("Read " + bytesRead + " bytes for the buffer")*/
 
         if (currentChunk.buffer.remaining == 0) {
-          /*println("Filled buffer at " + System.currentTimeMillis)*/
+          /* println("Filled buffer at " + System.currentTimeMillis)*/
           val bufferMessage = inbox.getMessageForChunk(currentChunk).get
           if (bufferMessage.isCompletelyReceived) {
             bufferMessage.flip
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index a75130cba2..2682f9d0ed 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -505,7 +505,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
       }
     }
     handleMessageExecutor.execute(runnable)
-    /*handleMessage(connection, message)*/
+    /* handleMessage(connection, message)*/
   }
 
   private def handleClientAuthentication(
@@ -733,7 +733,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
     logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
     val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
 
-    //send security message until going connection has been authenticated
+    // send security message until going connection has been authenticated
     connection.send(message)
 
     wakeupSelector()
@@ -859,14 +859,14 @@ private[spark] object ConnectionManager {
       None
     })
 
-    /*testSequentialSending(manager)*/
-    /*System.gc()*/
+    /* testSequentialSending(manager)*/
+    /* System.gc()*/
 
-    /*testParallelSending(manager)*/
-    /*System.gc()*/
+    /* testParallelSending(manager)*/
+    /* System.gc()*/
 
-    /*testParallelDecreasingSending(manager)*/
-    /*System.gc()*/
+    /* testParallelDecreasingSending(manager)*/
+    /* System.gc()*/
 
     testContinuousSending(manager)
     System.gc()
@@ -948,7 +948,7 @@ private[spark] object ConnectionManager {
     val ms = finishTime - startTime
     val tput = mb * 1000.0 / ms
     println("--------------------------")
-    /*println("Started at " + startTime + ", finished at " + finishTime) */
+    /* println("Started at " + startTime + ", finished at " + finishTime) */
     println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
     println("--------------------------")
     println()
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 35f64134b0..e5745d7daa 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{
     val slaves = slavesFile.mkString.split("\n")
     slavesFile.close()
 
-    /*println("Slaves")*/
-    /*slaves.foreach(println)*/
+    /* println("Slaves")*/
+    /* slaves.foreach(println)*/
     val tasknum = if (args.length > 2) args(2).toInt else slaves.length
     val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 
     val count = if (args.length > 4) args(4).toInt else 3
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 3c09a713c6..17fd931c9f 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -27,7 +27,7 @@ private[spark] object ReceiverTest {
     println("Started connection manager with id = " + manager.id)
 
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      /*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
+      /* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
       val buffer = ByteBuffer.wrap("response".getBytes)
       Some(Message.createBufferMessage(buffer, msg.id))
     })
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index aac2c24a46..905eddfbb9 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -50,7 +50,7 @@ private[spark] object SenderTest {
     (0 until count).foreach(i => {
       val dataMessage = Message.createBufferMessage(buffer.duplicate)
       val startTime = System.currentTimeMillis
-      /*println("Started timer at " + startTime)*/
+      /* println("Started timer at " + startTime)*/
       val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
         .map { response =>
           val buffer = response.asInstanceOf[BufferMessage].buffers(0)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index f9082ffb91..4164e81d3a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -32,7 +32,7 @@ private[spark] class FileHeader (
     buf.writeInt(fileLen)
     buf.writeInt(blockId.name.length)
     blockId.name.foreach((x: Char) => buf.writeByte(x))
-    //padding the rest of header
+    // padding the rest of header
     if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
       buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
     } else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77c558ac46..4fce47e1ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -753,7 +753,7 @@ class DAGScheduler(
     val properties = if (stageIdToActiveJob.contains(jobId)) {
       stageIdToActiveJob(stage.jobId).properties
     } else {
-      //this stage will be assigned to "default" pool
+      // this stage will be assigned to "default" pool
       null
     }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 990e01a3e7..7bfc30b420 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         properties += ((key, value))
       }
     }
-    //TODO (prashant) send conf instead of properties
+    // TODO (prashant) send conf instead of properties
     driverActor = actorSystem.actorOf(
       Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index bcfc39146a..2fbbda5b76 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -284,7 +284,7 @@ object BlockFetcherIterator {
               }
             } catch {
               case x: InterruptedException => logInfo("Copier Interrupted")
-              //case _ => throw new SparkException("Exception Throw in Shuffle Copier")
+              // case _ => throw new SparkException("Exception Throw in Shuffle Copier")
             }
           }
         }
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index a8d20ee332..cdbbc65292 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging {
       accessedFields(cls) = Set[String]()
     for (cls <- func.getClass :: innerClasses)
       getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
-    //logInfo("accessedFields: " + accessedFields)
+    // logInfo("accessedFields: " + accessedFields)
 
     val inInterpreter = {
       try {
@@ -139,13 +139,13 @@ private[spark] object ClosureCleaner extends Logging {
         val field = cls.getDeclaredField(fieldName)
         field.setAccessible(true)
         val value = field.get(obj)
-        //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
+        // logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
         field.set(outer, value)
       }
     }
     
     if (outer != null) {
-      //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
+      // logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
       val field = func.getClass.getDeclaredField("$outer")
       field.setAccessible(true)
       field.set(func, outer)
@@ -153,7 +153,7 @@ private[spark] object ClosureCleaner extends Logging {
   }
   
   private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
-    //logInfo("Creating a " + cls + " with outer = " + outer)
+    // logInfo("Creating a " + cls + " with outer = " + outer)
     if (!inInterpreter) {
       // This is a bona fide closure class, whose constructor has no effects
       // other than to set its fields, so use its constructor
@@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging {
       val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
       val obj = newCtor.newInstance().asInstanceOf[AnyRef]
       if (outer != null) {
-        //logInfo("3: Setting $outer on " + cls + " to " + outer);
+        // logInfo("3: Setting $outer on " + cls + " to " + outer);
         val field = cls.getDeclaredField("$outer")
         field.setAccessible(true)
         field.set(obj, outer)
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index c539d2f708..4188a869c1 100644
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl(
         if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
           log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
             "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
-          //shutdown()                 //TODO make it configurable
+          // shutdown()                 //TODO make it configurable
         } else {
           fallbackHandler.uncaughtException(thread, cause)
         }
diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
index 2c1a6f8fd0..a898824cff 100644
--- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala
+++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
@@ -24,8 +24,8 @@ package org.apache.spark.util
  * @param  _1   Element 1 of this MutablePair
  * @param  _2   Element 2 of this MutablePair
  */
-case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
-                       @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
+case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1,
+                       @specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2]
   (var _1: T1, var _2: T2)
   extends Product2[T1, T2]
 {
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 6c73ea6949..4e7c34e6d1 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -66,7 +66,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
 
   test ("add value to collection accumulators") {
     val maxI = 1000
-    for (nThreads <- List(1, 10)) { //test single & multi-threaded
+    for (nThreads <- List(1, 10)) { // test single & multi-threaded
       sc = new SparkContext("local[" + nThreads + "]", "test")
       val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
       val d = sc.parallelize(1 to maxI)
@@ -83,7 +83,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
 
   test ("value not readable in tasks") {
     val maxI = 1000
-    for (nThreads <- List(1, 10)) { //test single & multi-threaded
+    for (nThreads <- List(1, 10)) { // test single & multi-threaded
       sc = new SparkContext("local[" + nThreads + "]", "test")
       val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
       val d = sc.parallelize(1 to maxI)
@@ -124,7 +124,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
 
   test ("localValue readable in tasks") {
     val maxI = 1000
-    for (nThreads <- List(1, 10)) { //test single & multi-threaded
+    for (nThreads <- List(1, 10)) { // test single & multi-threaded
       sc = new SparkContext("local[" + nThreads + "]", "test")
       val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
       val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d2e29f20f0..d2555b7c05 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -432,7 +432,6 @@ object CheckpointSuite {
   // This is a custom cogroup function that does not use mapValues like
   // the PairRDDFunctions.cogroup()
   def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
-    //println("First = " + first + ", second = " + second)
     new CoGroupedRDD[K](
       Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
       part
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 996db70809..7c30626a0c 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -146,7 +146,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
 
     assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
     // We can't catch all usages of arrays, since they might occur inside other collections:
-    //assert(fails { arrPairs.distinct() })
+    // assert(fails { arrPairs.distinct() })
     assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index a25ce35736..7c843772bc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -111,7 +111,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     sc.addSparkListener(new StatsReportListener)
-    //just to make sure some of the tasks take a noticeable amount of time
+    // just to make sure some of the tasks take a noticeable amount of time
     val w = {i:Int =>
       if (i == 0)
         Thread.sleep(100)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index eb8f591560..616214fb5e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -39,7 +39,7 @@ class UtilsSuite extends FunSuite {
   }
 
   test("copyStream") {
-    //input array initialization
+    // input array initialization
     val bytes = Array.ofDim[Byte](9000)
     Random.nextBytes(bytes)
 
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index c8ecbb8e41..0095cb8425 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -53,7 +53,6 @@ object LocalALS {
     for (i <- 0 until M; j <- 0 until U) {
       r.set(i, j, blas.ddot(ms(i), us(j)))
     }
-    //println("R: " + r)
     blas.daxpy(-1, targetR, r)
     val sumSqs = r.aggregate(Functions.plus, Functions.square)
     sqrt(sumSqs / (M * U))
diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
index 73b0e216ca..1fdb324b89 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
@@ -61,7 +61,7 @@ object SimpleSkewedGroupByTest {
     
     println("RESULT: " + pairs1.groupByKey(numReducers).count)
     // Print how many keys each reducer got (for debugging)
-    //println("RESULT: " + pairs1.groupByKey(numReducers)
+    // println("RESULT: " + pairs1.groupByKey(numReducers)
     //                           .map{case (k,v) => (k, v.size)}
     //                           .collectAsMap)
 
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index ce4b3c8451..f59ab7e7cc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -54,7 +54,6 @@ object SparkALS {
     for (i <- 0 until M; j <- 0 until U) {
       r.set(i, j, blas.ddot(ms(i), us(j)))
     }
-    //println("R: " + r)
     blas.daxpy(-1, targetR, r)
     val sumSqs = r.aggregate(Functions.plus, Functions.square)
     sqrt(sumSqs / (M * U))
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index cf1fc3e808..e698b9bf37 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -34,8 +34,6 @@ object SparkHdfsLR {
   case class DataPoint(x: Vector, y: Double)
 
   def parsePoint(line: String): DataPoint = {
-    //val nums = line.split(' ').map(_.toDouble)
-    //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
     val tok = new java.util.StringTokenizer(line, " ")
     var y = tok.nextToken.toDouble
     var x = new Array[Double](D)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 62d3a52615..a22e64ca3c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -168,7 +168,7 @@ object ActorWordCount {
       Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
         host, port.toInt))), "SampleReceiver")
 
-    //compute wordcount
+    // compute wordcount
     lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
 
     ssc.start()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 35be7ffa1e..35f8f885f8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -88,7 +88,7 @@ object ZeroMQWordCount {
 
     def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
 
-    //For this stream, a zeroMQ publisher should be running.
+    // For this stream, a zeroMQ publisher should be running.
     val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 6acba25f44..a538c38dc4 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -44,7 +44,7 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
     case m: ZMQMessage =>
       logDebug("Received message for:" + m.frame(0))
 
-      //We ignore first frame for processing as it is the topic
+      // We ignore first frame for processing as it is the topic
       val bytes = m.frames.tail
       pushBlock(bytesToObjects(bytes))
 
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index fea43c3b2b..dfc6a80158 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -27,12 +27,12 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
   /**
    * The source vertex attribute
    */
-  var srcAttr: VD = _ //nullValue[VD]
+  var srcAttr: VD = _ // nullValue[VD]
 
   /**
    * The destination vertex attribute
    */
-  var dstAttr: VD = _ //nullValue[VD]
+  var dstAttr: VD = _ // nullValue[VD]
 
   /**
    * Set the edge properties of this triplet.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 43ac11d895..c2b510a31e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -190,9 +190,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
   }
 
-  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////////////////////////////////////
   // Lower level transformation methods
-  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////////////////////////////////////
 
   override def mapReduceTriplets[A: ClassTag](
       mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index fe6fe76def..bebe3740bc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -45,7 +45,7 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
  * @param data value to send
  */
 private[graphx]
-class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
+class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T](
     @transient var partition: PartitionID,
     var data: T)
   extends Product2[PartitionID, T] with Serializable {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 34a145e018..2f2c524df6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -298,7 +298,6 @@ abstract class ShuffleSerializationStream(s: OutputStream) extends Serialization
     s.write(v.toInt)
   }
 
-  //def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
   def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
 
   override def flush(): Unit = s.flush()
@@ -391,7 +390,6 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat
       (s.read() & 0xFF)
   }
 
-  //def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
   def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
 
   override def close(): Unit = s.close()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index 014a7335f8..087b1156f6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -65,7 +65,7 @@ private[graphx] object BytecodeUtils {
       val finder = new MethodInvocationFinder(c.getName, m)
       getClassReader(c).accept(finder, 0)
       for (classMethod <- finder.methodsInvoked) {
-        //println(classMethod)
+        // println(classMethod)
         if (classMethod._1 == targetClass && classMethod._2 == targetMethod) {
           return true
         } else if (!seen.contains(classMethod)) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index f841846c0e..a3c8de3f90 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -123,7 +123,7 @@ object GraphGenerators {
    * the dimensions of the adjacency matrix
    */
   private def addEdge(numVertices: Int): Edge[Int] = {
-    //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0)
+    // val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0)
     val v = math.round(numVertices.toFloat/2.0).toInt
 
     val (src, dst) = chooseCell(v, v, v)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9e269e6551..2549bc9710 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -17,7 +17,7 @@
 
 import sbt._
 import sbt.Classpaths.publishTask
-import Keys._
+import sbt.Keys._
 import sbtassembly.Plugin._
 import AssemblyKeys._
 import scala.util.Properties
@@ -27,7 +27,7 @@ import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
 import scala.collection.JavaConversions._
 
 // For Sonatype publishing
-//import com.jsuereth.pgp.sbtplugin.PgpKeys._
+// import com.jsuereth.pgp.sbtplugin.PgpKeys._
 
 object SparkBuild extends Build {
   val SPARK_VERSION = "1.0.0-SNAPSHOT" 
@@ -200,7 +200,7 @@ object SparkBuild extends Build {
 
     publishMavenStyle := true,
 
-    //useGpg in Global := true,
+    // useGpg in Global := true,
 
     pomExtra := (
       <parent>
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4ff6f67af4..5aa8a1ec24 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -22,3 +22,4 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")
 addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
 
 addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0")
+
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
new file mode 100644
index 0000000000..43361aa2b4
--- /dev/null
+++ b/project/project/SparkPluginBuild.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import sbt.Keys._
+
+/**
+ * This plugin project is there to define new scala style rules for spark. This is
+ * a plugin project so that this gets compiled first and is put on the classpath and
+ * becomes available for scalastyle sbt plugin.
+ */
+object SparkPluginDef extends Build {
+  lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle)
+  lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings)
+  val sparkVersion = "1.0.0-SNAPSHOT"
+  // There is actually no need to publish this artifact.
+  def styleSettings = Defaults.defaultSettings ++ Seq (
+    name                 :=  "spark-style",
+    organization         :=  "org.apache.spark",
+    version              :=  sparkVersion,
+    scalaVersion         :=  "2.10.3",
+    scalacOptions        :=  Seq("-unchecked", "-deprecation"),
+    libraryDependencies  ++= Dependencies.scalaStyle,
+    sbtPlugin            :=  true
+  )
+
+  object Dependencies {
+    val scalaStyle = Seq("org.scalastyle" %% "scalastyle" % "0.4.0")
+  }
+}
diff --git a/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala
new file mode 100644
index 0000000000..2f3c1a1828
--- /dev/null
+++ b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceAfterCommentStyleCheck.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.scalastyle
+
+import java.util.regex.Pattern
+
+import org.scalastyle.{PositionError, ScalariformChecker, ScalastyleError}
+import scalariform.lexer.{MultiLineComment, ScalaDocComment, SingleLineComment, Token}
+import scalariform.parser.CompilationUnit
+
+class SparkSpaceAfterCommentStartChecker extends ScalariformChecker {
+  val errorKey: String = "insert.a.single.space.after.comment.start"
+
+  private def multiLineCommentRegex(comment: Token) =
+    Pattern.compile( """/\*\S+.*""", Pattern.DOTALL).matcher(comment.text.trim).matches()
+
+  private def scalaDocPatternRegex(comment: Token) =
+    Pattern.compile( """/\*\*\S+.*""", Pattern.DOTALL).matcher(comment.text.trim).matches()
+
+  private def singleLineCommentRegex(comment: Token): Boolean =
+    comment.text.trim.matches( """//\S+.*""") && !comment.text.trim.matches( """///+""")
+
+  override def verify(ast: CompilationUnit): List[ScalastyleError] = {
+    ast.tokens
+      .filter(hasComment)
+      .map {
+      _.associatedWhitespaceAndComments.comments.map {
+        case x: SingleLineComment if singleLineCommentRegex(x.token) => Some(x.token.offset)
+        case x: MultiLineComment if multiLineCommentRegex(x.token) => Some(x.token.offset)
+        case x: ScalaDocComment if scalaDocPatternRegex(x.token) => Some(x.token.offset)
+        case _ => None
+      }.flatten
+    }.flatten.map(PositionError(_))
+  }
+
+
+  private def hasComment(x: Token) =
+    x.associatedWhitespaceAndComments != null && !x.associatedWhitespaceAndComments.comments.isEmpty
+
+}
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index ee972887fe..bf73800388 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -124,8 +124,8 @@ extends ClassVisitor(ASM4, cv) {
       mv.visitVarInsn(ALOAD, 0) // load this
       mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
       mv.visitVarInsn(ALOAD, 0) // load this
-      //val classType = className.replace('.', '/')
-      //mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";")
+      // val classType = className.replace('.', '/')
+      // mv.visitFieldInsn(PUTSTATIC, classType, "MODULE$", "L" + classType + ";")
       mv.visitInsn(RETURN)
       mv.visitMaxs(-1, -1) // stack size and local vars will be auto-computed
       mv.visitEnd()
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 90a96ad383..fa2f1a88c4 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -834,7 +834,7 @@ import org.apache.spark.util.Utils
                                     }
         ((pos, msg)) :: loop(filtered)
       }
-     //PRASHANT: This leads to a NoSuchMethodError for _.warnings. Yet to figure out its purpose.
+     // PRASHANT: This leads to a NoSuchMethodError for _.warnings. Yet to figure out its purpose.
       // val warnings = loop(run.allConditionalWarnings flatMap (_.warnings))
       // if (warnings.nonEmpty)
       //   mostRecentWarnings = warnings
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index ee968c53b3..76ba1ecca3 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -140,4 +140,5 @@
  <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> -->
  <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
  <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+ <check level="error" class="org.apache.spark.scalastyle.SparkSpaceAfterCommentStartChecker" enabled="true"></check>
 </scalastyle>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index f4b61381f9..b70ec897e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -662,7 +662,7 @@ object HiveQl {
       // worth the number of hacks that will be required to implement it.  Namely, we need to add
       // some sort of mapped star expansion that would expand all child output row to be similarly
       // named output expressions where some aggregate expression has been applied (i.e. First).
-      ??? /// Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult)
+      ??? // Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult)
 
     case Token(allJoinTokens(joinToken),
            relation1 ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index ca53113446..0da5eb754c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -94,7 +94,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     val tablePath = hiveTable.getPath
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
 
-    //logDebug("Table input: %s".format(tablePath))
+    // logDebug("Table input: %s".format(tablePath))
     val ifc = hiveTable.getInputFormatClass
       .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
     val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index fde46705d8..d3339063cc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -153,7 +153,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def validate() {
     this.synchronized {
       assert(batchDuration != null, "Batch duration has not been set")
-      //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
+      // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
       // " is very low")
       assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
     }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 0dc6704603..72ad0bae75 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -128,7 +128,6 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
     } catch {
       case ie: InterruptedException =>
         logInfo("Receiving thread interrupted")
-        //println("Receiving thread interrupted")
       case e: Exception =>
         stopOnError(e)
     }
@@ -142,7 +141,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   def stop() {
     receivingThread.interrupt()
     onStop()
-    //TODO: terminate the actor
+    // TODO: terminate the actor
   }
 
   /**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index ca0a8ae478..b334d68bf9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -78,7 +78,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
 
   override def checkpoint(interval: Duration): DStream[(K, V)] = {
     super.checkpoint(interval)
-    //reducedStream.checkpoint(interval)
+    // reducedStream.checkpoint(interval)
     this
   }
 
@@ -128,7 +128,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     // Cogroup the reduced RDDs and merge the reduced values
     val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
       partitioner)
-    //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+    // val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
 
     val numOldValues = oldRDDs.size
     val numNewValues = newRDDs.size
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 9d8889b655..5f7d3ba26c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -64,7 +64,6 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
             }
             val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
             val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
-            //logDebug("Generating state RDD for time " + validTime)
             Some(stateRDD)
           }
           case None => {    // If parent RDD does not exist
@@ -97,11 +96,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
 
             val groupedRDD = parentRDD.groupByKey(partitioner)
             val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
-            //logDebug("Generating state RDD for time " + validTime + " (first)")
+            // logDebug("Generating state RDD for time " + validTime + " (first)")
             Some(sessionRDD)
           }
           case None => { // If parent RDD does not exist, then nothing to do!
-            //logDebug("Not generating state RDD (no previous state, no parent)")
+            // logDebug("Not generating state RDD (no previous state, no parent)")
             None
           }
         }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index e4fa163f2e..cad68e248a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -126,7 +126,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
         receiverInfo -= streamId
         logError("De-registered receiver for network stream " + streamId
           + " with message " + msg)
-        //TODO: Do something about the corresponding NetworkInputDStream
+        // TODO: Do something about the corresponding NetworkInputDStream
       }
     }
   }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 0784e562ac..25739956cb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -252,7 +252,7 @@ class CheckpointSuite extends TestSuiteBase {
     ssc.start()
 
     // Create files and advance manual clock to process them
-    //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
     Thread.sleep(1000)
     for (i <- Seq(1, 2, 3)) {
       Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 74e73ebb34..7df206241b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -154,7 +154,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
-      StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
+      // Had to pass the local value of port to prevent from closing over entire scope
+      StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
     def output = outputBuffer.flatMap(x => x)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 57e5761cba..6568003bf1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -139,7 +139,6 @@ trait ClientBase extends Logging {
     } else if (srcHost != null && dstHost == null) {
       return false
     }
-    //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
       false
     } else {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 68cda0f1c9..9b7f1fca96 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -157,7 +157,7 @@ class ClientDistributedCacheManager() extends Logging {
   def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
     val fs = FileSystem.get(uri, conf)
     val current = new Path(uri.getPath())
-    //the leaf level file should be readable by others
+    // the leaf level file should be readable by others
     if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
       return false
     }
@@ -177,7 +177,7 @@ class ClientDistributedCacheManager() extends Logging {
       statCache: Map[URI, FileStatus]): Boolean =  {
     var current = path
     while (current != null) {
-      //the subdirs in the path should have execute permissions for others
+      // the subdirs in the path should have execute permissions for others
       if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
         return false
       }
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 458df4fa3c..80b57d1355 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -99,7 +99,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
     assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
     assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
 
-    //add another one and verify both there and order correct
+    // add another one and verify both there and order correct
     val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
       null, new Path("/tmp/testing2"))
     val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
-- 
GitLab