diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 86ca8445a112458b6a9d1c94907ae8bdb5ae0367..f33c2e065a2008030b0d7ce1c40e9407e908febe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long) +case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) +case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 3ab07703b6f85bb269bc6ca08fa6dc6423097ef9..1a6c7cb24f9ac5af7c4a380a43e18c8493818e77 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockLocations.remove(blockId) } } - listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId)) + listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) } private def expireDeadHosts() { @@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => @@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus id.hostPort, Utils.bytesToString(maxMemSize))) blockManagerInfo(id) = - new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) + new BlockManagerInfo(id, time, maxMemSize, slaveActor) } - listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize)) + listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) } private def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a7543454eca1ff6b5f00878724cd3b8a94698821..1fc536b0969960efd01619b901d213befa2d7662 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -152,13 +152,15 @@ private[spark] object JsonProtocol { val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId) ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~ ("Block Manager ID" -> blockManagerId) ~ - ("Maximum Memory" -> blockManagerAdded.maxMem) + ("Maximum Memory" -> blockManagerAdded.maxMem) ~ + ("Timestamp" -> blockManagerAdded.time) } def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId) ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~ - ("Block Manager ID" -> blockManagerId) + ("Block Manager ID" -> blockManagerId) ~ + ("Timestamp" -> blockManagerRemoved.time) } def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = { @@ -466,12 +468,14 @@ private[spark] object JsonProtocol { def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] - SparkListenerBlockManagerAdded(blockManagerId, maxMem) + val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) + SparkListenerBlockManagerAdded(time, blockManagerId, maxMem) } def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") - SparkListenerBlockManagerRemoved(blockManagerId) + val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) + SparkListenerBlockManagerRemoved(time, blockManagerId) } def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = { diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 4e022a69c8212db83261902f7c1989c08afa2961..3a45875391e29b15b1f25c549736171d183967a9 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -36,13 +36,13 @@ class StorageStatusListenerSuite extends FunSuite { // Block manager add assert(listener.executorIdToStorageStatus.size === 0) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) assert(listener.executorIdToStorageStatus.size === 1) assert(listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) assert(listener.executorIdToStorageStatus.size === 2) assert(listener.executorIdToStorageStatus.get("fat").isDefined) assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) @@ -50,11 +50,11 @@ class StorageStatusListenerSuite extends FunSuite { assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) // Block manager remove - listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1)) assert(listener.executorIdToStorageStatus.size === 1) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus.get("fat").isDefined) - listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2)) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2)) assert(listener.executorIdToStorageStatus.size === 0) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(!listener.executorIdToStorageStatus.get("fat").isDefined) @@ -62,8 +62,8 @@ class StorageStatusListenerSuite extends FunSuite { test("task end without updated blocks") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics = new TaskMetrics // Task end with no updated blocks @@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite { test("task end with updated blocks") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) @@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite { test("unpersist RDD") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index d9e9c70a8a9e784d5aae5df21d519e040aee662c..e1bc1379b5d803a2939890fb8fb94e727448efc1 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val myRddInfo1 = rddInfo1 val myRddInfo2 = rddInfo2 val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") - bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) assert(storageListener.rddInfoList.size === 0) // not cached @@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L)) taskMetrics0.updatedBlocks = Some(Seq(block0)) taskMetrics1.updatedBlocks = Some(Seq(block1)) - bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener.rddInfoList.size === 0) bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 66a17de9ec9ce8dcfa4641480a1d6147bb2256a7..c84bafce37f70bda06ed5ea2a77aeccd0310739a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -21,6 +21,9 @@ import java.util.Properties import scala.collection.Map +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.scalatest.FunSuite @@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite { "System Properties" -> Seq(("Username", "guest"), ("Password", "guest")), "Classpath Entries" -> Seq(("Super library", "/tmp/super_library")) )) - val blockManagerAdded = SparkListenerBlockManagerAdded( + val blockManagerAdded = SparkListenerBlockManagerAdded(1L, BlockManagerId("Stars", "In your multitude...", 300), 500) - val blockManagerRemoved = SparkListenerBlockManagerRemoved( + val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") @@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("BlockManager events backward compatibility") { + // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. + val blockManagerAdded = SparkListenerBlockManagerAdded(1L, + BlockManagerId("Stars", "In your multitude...", 300), 500) + val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, + BlockManagerId("Scarce", "to be counted...", 100)) + + val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded) + .removeField({ _._1 == "Timestamp" }) + + val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded) + assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId, + blockManagerAdded.maxMem) === deserializedBmAdded) + + val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved) + .removeField({ _._1 == "Timestamp" }) + + val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved) + assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) === + deserializedBmRemoved) + } + /** -------------------------- * | Helper test running methods | @@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(e1.environmentDetails, e2.environmentDetails) case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) => assert(e1.maxMem === e2.maxMem) + assert(e1.time === e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => + assert(e1.time === e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => assert(e1.rddId == e2.rddId) @@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite { | "Host": "In your multitude...", | "Port": 300 | }, - | "Maximum Memory": 500 + | "Maximum Memory": 500, + | "Timestamp": 1 |} """ @@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite { | "Executor ID": "Scarce", | "Host": "to be counted...", | "Port": 100 - | } + | }, + | "Timestamp": 2 |} """