Skip to content
Snippets Groups Projects
Commit 5a8abfb7 authored by Henry Saputra's avatar Henry Saputra
Browse files

Address code review concerns and comments.

parent f1c5eca4
No related branches found
No related tags found
No related merge requests found
...@@ -189,7 +189,7 @@ object SparkHadoopWriter { ...@@ -189,7 +189,7 @@ object SparkHadoopWriter {
if (path == null) { if (path == null) {
throw new IllegalArgumentException("Output path is null") throw new IllegalArgumentException("Output path is null")
} }
var outputPath = new Path(path) val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf) val fs = outputPath.getFileSystem(conf)
if (outputPath == null || fs == null) { if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path") throw new IllegalArgumentException("Incorrectly formatted output path")
......
...@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag]( ...@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout // Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
new Iterator[Array[Byte]] { val stdoutIterator = new Iterator[Array[Byte]] {
def next(): Array[Byte] = { def next(): Array[Byte] = {
val obj = _nextObj val obj = _nextObj
if (hasNext) { if (hasNext) {
...@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag]( ...@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
def hasNext = _nextObj.length != 0 def hasNext = _nextObj.length != 0
} }
stdoutIterator
} }
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
......
...@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc ...@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val prefPartActual = prefPart.get val prefPartActual = prefPart.get
if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
minPowerOfTwo // prefer balance over locality minPowerOfTwo // prefer balance over locality
else { } else {
prefPartActual // prefer locality over balance prefPartActual // prefer locality over balance
} }
} }
...@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc ...@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
*/ */
def run(): Array[PartitionGroup] = { def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
throwBalls() // assign partitions (balls) to each group (bins) throwBalls() // assign partitions (balls) to each group (bins)
getPartitions getPartitions
} }
} }
......
...@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends ...@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
Some(new BlockMessageArray(responseMessages).toBufferMessage) Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch { } catch {
case e: Exception => logError("Exception handling buffer message", e) case e: Exception => logError("Exception handling buffer message", e)
return None None
} }
} }
case otherMessage: Any => { case otherMessage: Any => {
...@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging { ...@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
val blockMessageArray = new BlockMessageArray(blockMessage) val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync( val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage) toConnManagerId, blockMessageArray.toBufferMessage)
return (resultMessage != None) resultMessage != None
} }
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
...@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging { ...@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging {
return blockMessage.getData return blockMessage.getData
}) })
} }
case None => logDebug("No response message received"); return null case None => logDebug("No response message received")
} }
null null
} }
......
...@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont ...@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
locations: Seq[Seq[String]] = Nil locations: Seq[Seq[String]] = Nil
): MyRDD = { ): MyRDD = {
val maxPartition = numPartitions - 1 val maxPartition = numPartitions - 1
new MyRDD(sc, dependencies) { val newRDD = new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached") throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition { override def getPartitions = (0 to maxPartition).map(i => new Partition {
...@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont ...@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Nil Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id override def toString: String = "DAGSchedulerSuiteRDD " + id
} }
newRDD
} }
/** /**
......
...@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers ...@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage)
} }
type MyRDD = RDD[(Int, Int)] type MyRDD = RDD[(Int, Int)]
def makeRdd( def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
numPartitions: Int,
dependencies: List[Dependency[_]]
): MyRDD = {
val maxPartition = numPartitions - 1 val maxPartition = numPartitions - 1
new MyRDD(sc, dependencies) { new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
......
...@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ...@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
} }
//check for ports //check for ports
if (srcUri.getPort() != dstUri.getPort()) { if (srcUri.getPort() != dstUri.getPort()) {
false return false
} else {
true
} }
true
} }
/** Copy the file into HDFS if needed. */ /** Copy the file into HDFS if needed. */
......
...@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ...@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
} }
//check for ports //check for ports
if (srcUri.getPort() != dstUri.getPort()) { if (srcUri.getPort() != dstUri.getPort()) {
false return false
} else {
true
} }
true
} }
/** Copy the file into HDFS if needed. */ /** Copy the file into HDFS if needed. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment