diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 9f2b0c42c74d60bd933240af9fb88d706708a72b..272d7cdad3bc213b1202b5d249015d957319181b 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -88,7 +88,7 @@ object SparkEnv extends Logging { val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer") val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal) - val blockManager = new BlockManager(blockManagerMaster, serializer) + val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer) val connectionManager = blockManager.connectionManager diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bb6fc34f5db2d110b65c5bf3c88a533936a95eee..4e7d11996f8b2d8db3a00dbbc19e7fc8a2008cad 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -1,7 +1,9 @@ package spark.storage +import akka.actor.{ActorSystem, Cancellable} import akka.dispatch.{Await, Future} import akka.util.Duration +import akka.util.duration._ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream @@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConversions._ -import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils} +import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer import spark.util.ByteBufferInputStream @@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter } } - private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) private[spark] -class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) +class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, + val serializer: Serializer, maxMemory: Long) extends Logging { class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { @@ -116,28 +118,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - val heartBeatThread = new Thread("BlockManager heartbeat") { - setDaemon(true) - - override def run: Unit = { - while (!shuttingDown) { - heartBeat() - try { - Thread.sleep(heartBeatFrequency) - } catch { - case e: InterruptedException => {} - } - } - } - } + var heartBeatTask: Cancellable = null initialize() /** * Construct a BlockManager with a memory limit set based on system properties. */ - def this(master: BlockManagerMaster, serializer: Serializer) = { - this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties) + def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = { + this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties) } /** @@ -149,7 +138,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m RegisterBlockManager(blockManagerId, maxMemory)) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting) { - heartBeatThread.start() + heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { + heartBeat() + } } } @@ -914,8 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } def stop() { - shuttingDown = true - heartBeatThread.interrupt() + if (heartBeatTask != null) { + heartBeatTask.cancel() + } connectionManager.stop() blockInfo.clear() memoryStore.clear() diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index e4a5b8ffdf6953f37d1ef4cb0b49aa09edab2684..5bb5a29cc42243a95c160f2eb62a65a8ba1e2792 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -74,7 +74,7 @@ private[spark] object ThreadingTest { val actorSystem = ActorSystem("test") val serializer = new KryoSerializer val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true) - val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024) + val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) producers.foreach(_.start) diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 14918181402d003c391ca33666ed90e5b627ed2e..ad2253596df86b4330be66814e732a4332b30734 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -64,7 +64,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("manager-master interaction") { - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -95,7 +95,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -112,7 +112,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("reregistration on block update") { - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -133,14 +133,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("deregistration on duplicate") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") - store2 = new BlockManager(master, serializer, 2000) + store2 = new BlockManager(actorSystem, master, serializer, 2000) assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") @@ -154,7 +154,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -173,7 +173,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -192,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -211,7 +211,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -234,7 +234,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -247,7 +247,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -262,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -277,7 +277,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -292,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -307,7 +307,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -332,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -356,7 +356,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -402,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - store = new BlockManager(master, serializer, 500) + store = new BlockManager(actorSystem, master, serializer, 500) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) @@ -413,49 +413,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block compression") { try { System.setProperty("spark.shuffle.compress", "true") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") store.stop() store = null System.setProperty("spark.shuffle.compress", "false") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "true") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "false") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "true") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "false") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") store.stop()