diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index fc0a7498820b5039c54615044792207bf4ca276c..3081f927cc3cb2a66a148915fb10344bae228cfa 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -49,7 +49,7 @@ object Partitioner { */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse - for (r <- bySize if r.partitioner != None) { + for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 9485bfd89eb5797c21bdd06ea217c6cb0bb89508..f29a6ad2e7b92c9a876a7cf636929166718374c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -67,7 +67,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { <li><strong>User:</strong> {app.desc.user}</li> <li><strong>Cores:</strong> { - if (app.desc.maxCores == None) { + if (app.desc.maxCores.isEmpty) { "Unlimited (%s granted)".format(app.coresGranted) } else { "%s (%s granted, %s left)".format( diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 6f9f29969eaec85d315c964092c6c815516e75e8..e54ac0b332093b0306473c08d8871213105f7487 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -80,7 +80,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi val subProperties = new mutable.HashMap[String, Properties] import scala.collection.JavaConversions._ prop.foreach { kv => - if (regex.findPrefixOf(kv._1) != None) { + if (regex.findPrefixOf(kv._1).isDefined) { val regex(prefix, suffix) = kv._1 subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2) } diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala index d71069444a73f498eac40995755362f9afb4f9d9..423ff67a5fd43700e586b9e28930ef0f532090d8 100644 --- a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala +++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala @@ -71,7 +71,7 @@ private[spark] class ApproximateActionListener[T, U, R]( val finishTime = startTime + timeout while (true) { val time = System.currentTimeMillis() - if (failure != None) { + if (failure.isDefined) { throw failure.get } else if (finishedTasks == totalTasks) { return new PartialResult(evaluator.currentResult(), true) diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala index 5ce49b8100ee6ad42afa29b4add097c0bb3f3742..812368e04ac0d1115ee24e2f09ef4f790c3ff777 100644 --- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala +++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala @@ -31,10 +31,10 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { * Blocking method to wait for and return the final value. */ def getFinalValue(): R = synchronized { - while (finalValue == None && failure == None) { + while (finalValue.isEmpty && failure.isEmpty) { this.wait() } - if (finalValue != None) { + if (finalValue.isDefined) { return finalValue.get } else { throw failure.get @@ -46,11 +46,11 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { * is supported per PartialResult. */ def onComplete(handler: R => Unit): PartialResult[R] = synchronized { - if (completionHandler != None) { + if (completionHandler.isDefined) { throw new UnsupportedOperationException("onComplete cannot be called twice") } completionHandler = Some(handler) - if (finalValue != None) { + if (finalValue.isDefined) { // We already have a final value, so let's call the handler handler(finalValue.get) } @@ -63,11 +63,11 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { */ def onFail(handler: Exception => Unit) { synchronized { - if (failureHandler != None) { + if (failureHandler.isDefined) { throw new UnsupportedOperationException("onFail cannot be called twice") } failureHandler = Some(handler) - if (failure != None) { + if (failure.isDefined) { // We already have a failure, so let's call the handler handler(failure.get) } @@ -102,7 +102,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { private[spark] def setFinalValue(value: R) { synchronized { - if (finalValue != None) { + if (finalValue.isDefined) { throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult") } finalValue = Some(value) @@ -117,7 +117,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) { private[spark] def setFailure(exception: Exception) { synchronized { - if (failure != None) { + if (failure.isDefined) { throw new UnsupportedOperationException("setFailure called twice on a PartialResult") } failure = Some(exception) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index cd90a1561a975e4424690491188daf007ddbf2c9..1472c92b6031d8d9123d6fa0667e246572836acf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -666,7 +666,7 @@ abstract class RDD[T: ClassTag]( } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { - if (taskResult != None) { + if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7046c06d2057d68e6e16f4d6bc2e15e5cbda42db..237cbf4c0c942c139f6405407ae24affa03e5c99 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -877,7 +877,7 @@ class DAGScheduler( logInfo("running: " + running) logInfo("waiting: " + waiting) logInfo("failed: " + failed) - if (stage.shuffleDep != None) { + if (stage.shuffleDep.isDefined) { // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index c60e9896dee4ffe0ef2254deed52fc157bbe7b3b..520c0b29e3536c4681d2d3710ec1d5a47e07cb58 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -46,7 +46,7 @@ private[spark] class Stage( callSite: Option[String]) extends Logging { - val isShuffleMap = shuffleDep != None + val isShuffleMap = shuffleDep.isDefined val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6cc608ea5bc690ab91a91fb482beddd817a95843..83ba5840155fb6a0a87cfec4b456e461b02fe937 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -293,7 +293,7 @@ private[spark] class TaskSchedulerImpl( } } // Update the DAGScheduler without holding a lock on this, since that can deadlock - if (failedExecutor != None) { + if (failedExecutor.isDefined) { dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } @@ -387,7 +387,7 @@ private[spark] class TaskSchedulerImpl( } } // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock - if (failedExecutor != None) { + if (failedExecutor.isDefined) { dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 42f52d7b26a04b677a66e3d14a8f4a0cd797b598..3efe738a08f666a883c6cacb3f23a95873aeb441 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging { val blockMessageArray = new BlockMessageArray(blockMessage) val resultMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) - resultMessage != None + resultMessage.isDefined } def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 27f057b9f22f4df42432acefb9efd8e4cab9e3aa..eb5a18521683e2be27d4ae0fbaa2eb499e945a50 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -214,7 +214,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - if (rddToAdd != None && rddToAdd == getRddId(blockId)) { + if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) { logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + "block from the same RDD") return false diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 64acfbd3526f3a36a0af4f22e5f635a3a87185ab..8447773343d25a8b8731f81a5d53ff4363d7a319 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -652,7 +652,7 @@ private[spark] object Utils extends Logging { for (el <- trace) { if (!finished) { - if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) { + if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { lastSparkMethod = if (el.getMethodName == "<init>") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c9f6cc5d079b5909459ec4fb843c57c9d0354aef..ecac2f79a25e26b7f62cabebeca4cd28695e7f3b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -287,7 +287,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // after the last failure. (1 to manager.maxTaskFailures).foreach { index => val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) - assert(offerResult != None, + assert(offerResult.isDefined, "Expect resource offer on iteration %s to return a task".format(index)) assert(offerResult.get.index === 0) manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 18aa587662d245b9444ddbb9d68c2c6bc1c1046f..85011c6451777ef6b5e72a0128e0bd7fc38beafe 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -137,9 +137,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory - assert(store.getSingle("a1") != None, "a1 was not in store") - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") // Checking whether master knows about the blocks or not assert(master.getLocations("a1").size > 0, "master was not told about a1") @@ -186,9 +186,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val memStatus = master.getMemoryStatus.head._2 assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200") - assert(store.getSingle("a1-to-remove") != None, "a1 was not in store") - assert(store.getSingle("a2-to-remove") != None, "a2 was not in store") - assert(store.getSingle("a3-to-remove") != None, "a3 was not in store") + assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store") + assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store") + assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store") // Checking whether master knows about the blocks or not assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1") @@ -259,7 +259,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") assert(master.getLocations("a1").size > 0, "master was not told about a1") master.removeExecutor(store.blockManagerId.executorId) @@ -333,14 +333,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") assert(store.getSingle("a1") === None, "a1 was in store") - assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - assert(store.getSingle("a1") != None, "a1 was not in store") - assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") assert(store.getSingle("a3") === None, "a3 was in store") } @@ -352,14 +352,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER) - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") assert(store.getSingle("a1") === None, "a1 was in store") - assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) - assert(store.getSingle("a1") != None, "a1 was not in store") - assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") assert(store.getSingle("a3") === None, "a3 was in store") } @@ -374,8 +374,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 // from the same RDD assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") - assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store") - assert(store.getSingle(rdd(0, 1)) != None, "rdd_0_1 was not in store") + assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") + assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store") // Check that rdd_0_3 doesn't replace them even after further accesses assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") @@ -392,7 +392,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") // Do a get() on rdd_0_2 so that it is the most recently used item - assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store") + assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") // Put in more partitions from RDD 0; they should replace rdd_1_1 store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -413,9 +413,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.DISK_ONLY) store.putSingle("a2", a2, StorageLevel.DISK_ONLY) store.putSingle("a3", a3, StorageLevel.DISK_ONLY) - assert(store.getSingle("a2") != None, "a2 was in store") - assert(store.getSingle("a3") != None, "a3 was in store") - assert(store.getSingle("a1") != None, "a1 was in store") + assert(store.getSingle("a2").isDefined, "a2 was in store") + assert(store.getSingle("a3").isDefined, "a3 was in store") + assert(store.getSingle("a1").isDefined, "a1 was in store") } test("disk and memory storage") { @@ -426,11 +426,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") - assert(store.getSingle("a1") != None, "a1 was not in store") - assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store") } test("disk and memory storage with getLocalBytes") { @@ -441,11 +441,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) - assert(store.getLocalBytes("a2") != None, "a2 was not in store") - assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.getLocalBytes("a2").isDefined, "a2 was not in store") + assert(store.getLocalBytes("a3").isDefined, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") - assert(store.getLocalBytes("a1") != None, "a1 was not in store") - assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + assert(store.getLocalBytes("a1").isDefined, "a1 was not in store") + assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store") } test("disk and memory storage with serialization") { @@ -456,11 +456,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") - assert(store.getSingle("a1") != None, "a1 was not in store") - assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + assert(store.getSingle("a1").isDefined, "a1 was not in store") + assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store") } test("disk and memory storage with serialization and getLocalBytes") { @@ -471,11 +471,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) - assert(store.getLocalBytes("a2") != None, "a2 was not in store") - assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.getLocalBytes("a2").isDefined, "a2 was not in store") + assert(store.getLocalBytes("a3").isDefined, "a3 was not in store") assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") - assert(store.getLocalBytes("a1") != None, "a1 was not in store") - assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + assert(store.getLocalBytes("a1").isDefined, "a1 was not in store") + assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store") } test("LRU with mixed storage levels") { @@ -489,18 +489,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a3", a3, StorageLevel.DISK_ONLY) // At this point LRU should not kick in because a3 is only on disk - assert(store.getSingle("a1") != None, "a2 was not in store") - assert(store.getSingle("a2") != None, "a3 was not in store") - assert(store.getSingle("a3") != None, "a1 was not in store") - assert(store.getSingle("a1") != None, "a2 was not in store") - assert(store.getSingle("a2") != None, "a3 was not in store") - assert(store.getSingle("a3") != None, "a1 was not in store") + assert(store.getSingle("a1").isDefined, "a2 was not in store") + assert(store.getSingle("a2").isDefined, "a3 was not in store") + assert(store.getSingle("a3").isDefined, "a1 was not in store") + assert(store.getSingle("a1").isDefined, "a2 was not in store") + assert(store.getSingle("a2").isDefined, "a3 was not in store") + assert(store.getSingle("a3").isDefined, "a1 was not in store") // Now let's add in a4, which uses both disk and memory; a1 should drop out store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) assert(store.getSingle("a1") == None, "a1 was in store") - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") - assert(store.getSingle("a4") != None, "a4 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") + assert(store.getSingle("a3").isDefined, "a3 was not in store") + assert(store.getSingle("a4").isDefined, "a4 was not in store") } test("in-memory LRU with streams") { @@ -511,18 +511,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.size == 2) - assert(store.get("list3") != None, "list3 was not in store") + assert(store.get("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.size == 2) assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.size == 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - assert(store.get("list1") != None, "list1 was not in store") + assert(store.get("list1").isDefined, "list1 was not in store") assert(store.get("list1").get.size == 2) - assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.size == 2) assert(store.get("list3") === None, "list1 was in store") } @@ -538,26 +538,26 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) // At this point LRU should not kick in because list3 is only on disk - assert(store.get("list1") != None, "list2 was not in store") + assert(store.get("list1").isDefined, "list2 was not in store") assert(store.get("list1").get.size === 2) - assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").isDefined, "list3 was not in store") assert(store.get("list2").get.size === 2) - assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").isDefined, "list1 was not in store") assert(store.get("list3").get.size === 2) - assert(store.get("list1") != None, "list2 was not in store") + assert(store.get("list1").isDefined, "list2 was not in store") assert(store.get("list1").get.size === 2) - assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").isDefined, "list3 was not in store") assert(store.get("list2").get.size === 2) - assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").isDefined, "list1 was not in store") assert(store.get("list3").get.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").isDefined, "list3 was not in store") assert(store.get("list2").get.size === 2) - assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").isDefined, "list1 was not in store") assert(store.get("list3").get.size === 2) - assert(store.get("list4") != None, "list4 was not in store") + assert(store.get("list4").isDefined, "list4 was not in store") assert(store.get("list4").get.size === 2) } @@ -579,7 +579,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") - assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a2").isDefined, "a2 was not in store") } test("block compression") {