From 5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d Mon Sep 17 00:00:00 2001
From: Henry Saputra <hsaputra@apache.org>
Date: Sun, 12 Jan 2014 19:15:09 -0800
Subject: [PATCH] Address code review concerns and comments.

---
 .../src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +-
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala  | 3 ++-
 core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 6 +++---
 .../scala/org/apache/spark/storage/BlockManagerWorker.scala | 6 +++---
 .../org/apache/spark/scheduler/DAGSchedulerSuite.scala      | 3 ++-
 .../scala/org/apache/spark/scheduler/JobLoggerSuite.scala   | 5 +----
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala    | 6 +++---
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala    | 6 +++---
 8 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index bba873a0b6..4e63117a51 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -189,7 +189,7 @@ object SparkHadoopWriter {
     if (path == null) {
       throw new IllegalArgumentException("Output path is null")
     }
-    var outputPath = new Path(path)
+    val outputPath = new Path(path)
     val fs = outputPath.getFileSystem(conf)
     if (outputPath == null || fs == null) {
       throw new IllegalArgumentException("Incorrectly formatted output path")
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8830de7273..82527fe663 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
-    new Iterator[Array[Byte]] {
+    val stdoutIterator = new Iterator[Array[Byte]] {
       def next(): Array[Byte] = {
         val obj = _nextObj
         if (hasNext) {
@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
       def hasNext = _nextObj.length != 0
     }
+    stdoutIterator
   }
 
   val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index a5394a28e0..cefcc3d2d9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
 
     val prefPartActual = prefPart.get
 
-    if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
+    if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
       minPowerOfTwo  // prefer balance over locality
-    else {
+    } else {
       prefPartActual // prefer locality over balance
     }
   }
@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
    */
   def run(): Array[PartitionGroup] = {
     setupGroups(math.min(prev.partitions.length, maxPartitions))   // setup the groups (bins)
-    throwBalls()             // assign partitions (balls) to each group (bins)
+    throwBalls() // assign partitions (balls) to each group (bins)
     getPartitions
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index a36abe0670..42f52d7b26 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
           Some(new BlockMessageArray(responseMessages).toBufferMessage)
         } catch {
           case e: Exception => logError("Exception handling buffer message", e)
-          return None
+          None
         }
       }
       case otherMessage: Any => {
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
     val blockMessageArray = new BlockMessageArray(blockMessage)
     val resultMessage = connectionManager.sendMessageReliablySync(
         toConnManagerId, blockMessageArray.toBufferMessage)
-    return (resultMessage != None)
+    resultMessage != None
   }
 
   def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging {
             return blockMessage.getData
           })
       }
-      case None => logDebug("No response message received"); return null
+      case None => logDebug("No response message received")
     }
     null
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 14f89d50b7..f0236ef1e9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
         locations: Seq[Seq[String]] = Nil
       ): MyRDD = {
     val maxPartition = numPartitions - 1
-    new MyRDD(sc, dependencies) {
+    val newRDD = new MyRDD(sc, dependencies) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         throw new RuntimeException("should not be reached")
       override def getPartitions = (0 to maxPartition).map(i => new Partition {
@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
           Nil
       override def toString: String = "DAGSchedulerSuiteRDD " + id
     }
+    newRDD
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 3880e68725..29102913c7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
       def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) 
     }
     type MyRDD = RDD[(Int, Int)]
-    def makeRdd(
-        numPartitions: Int,
-        dependencies: List[Dependency[_]]
-      ): MyRDD = {
+    def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
       val maxPartition = numPartitions - 1
       new MyRDD(sc, dependencies) {
         override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e1fe09e3e2..e56bc02897 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
+      return false
     }
+
+    true
   }
 
   /** Copy the file into HDFS if needed. */
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c084485734..51d9adb9d4 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
+      return false
     }
+
+    true
   }
 
   /** Copy the file into HDFS if needed. */
-- 
GitLab