diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index ca60ec78b62eecfbc63e8bce12aad5d01cdcd4b3..4ab8ec8f0ff3b53814180b0575b29eb151c6a8ac 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -33,16 +33,8 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils /** - * A block fetcher iterator interface. There are two implementations: - * - * BasicBlockFetcherIterator: uses a custom-built NIO communication layer. - * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer. - * - * Eventually we would like the two to converge and use a single NIO-based communication layer, - * but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores), - * NIO would perform poorly and thus the need for the Netty OIO one. + * A block fetcher iterator interface for fetching shuffle blocks. */ - private[storage] trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() @@ -262,67 +254,4 @@ object BlockFetcherIterator { } } // End of BasicBlockFetcherIterator - - class NettyBlockFetcherIterator( - blockManager: BlockManager, - blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], - serializer: Serializer, - readMetrics: ShuffleReadMetrics) - extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) { - - override protected def sendRequest(req: FetchRequest) { - logDebug("Sending request for %d blocks (%s) from %s".format( - req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) - val cmId = new ConnectionManagerId(req.address.host, req.address.port) - - bytesInFlight += req.size - val sizeMap = req.blocks.toMap // so we can look up the size of each blockID - - // This could throw a TimeoutException. In that case we will just retry the task. - val client = blockManager.nettyBlockClientFactory.createClient( - cmId.host, req.address.nettyPort) - val blocks = req.blocks.map(_._1.toString) - - client.fetchBlocks( - blocks, - new BlockClientListener { - override def onFetchFailure(blockId: String, errorMsg: String): Unit = { - logError(s"Could not get block(s) from $cmId with error: $errorMsg") - for ((blockId, size) <- req.blocks) { - results.put(new FetchResult(blockId, -1, null)) - } - } - - override def onFetchSuccess(blockId: String, data: ReferenceCountedBuffer): Unit = { - // Increment the reference count so the buffer won't be recycled. - // TODO: This could result in memory leaks when the task is stopped due to exception - // before the iterator is exhausted. - data.retain() - val buf = data.byteBuffer() - val blockSize = buf.remaining() - val bid = BlockId(blockId) - - // TODO: remove code duplication between here and BlockManager.dataDeserialization. - results.put(new FetchResult(bid, sizeMap(bid), () => { - def createIterator: Iterator[Any] = { - val stream = blockManager.wrapForCompression(bid, data.inputStream()) - serializer.newInstance().deserializeStream(stream).asIterator - } - new LazyInitIterator(createIterator) { - // Release the buffer when we are done traversing it. - override def close(): Unit = data.release() - } - })) - - readMetrics.synchronized { - readMetrics.remoteBytesRead += blockSize - readMetrics.remoteBlocksFetched += 1 - } - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) - } - } - ) - } - } - // End of NettyBlockFetcherIterator } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 12a92d44f4c36ea9200d7c9e2754cb5a1896843c..1eb622c12a79f8ee19c00c4be9cb91c2249c85dc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -32,8 +32,6 @@ import org.apache.spark._ import org.apache.spark.executor._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ -import org.apache.spark.network.netty.client.BlockFetchingClientFactory -import org.apache.spark.network.netty.server.BlockServer import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.util._ @@ -90,27 +88,8 @@ private[spark] class BlockManager( new TachyonStore(this, tachyonBlockManager) } - private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false) - - // If we use Netty for shuffle, start a new Netty-based shuffle sender service. - private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = { - if (useNetty) new BlockFetchingClientFactory(conf) else null - } - - private val nettyBlockServer: BlockServer = { - if (useNetty) { - val server = new BlockServer(conf, this) - logInfo(s"Created NettyBlockServer binding to port: ${server.port}") - server - } else { - null - } - } - - private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0 - val blockManagerId = BlockManagerId( - executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) + executorId, connectionManager.id.host, connectionManager.id.port) // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) @@ -572,14 +551,8 @@ private[spark] class BlockManager( blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], serializer: Serializer, readMetrics: ShuffleReadMetrics): BlockFetcherIterator = { - val iter = - if (conf.getBoolean("spark.shuffle.use.netty", false)) { - new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer, - readMetrics) - } else { - new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer, - readMetrics) - } + val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer, + readMetrics) iter.initialize() iter } @@ -1092,14 +1065,6 @@ private[spark] class BlockManager( connectionManager.stop() shuffleBlockManager.stop() diskBlockManager.stop() - - if (nettyBlockClientFactory != null) { - nettyBlockClientFactory.stop() - } - if (nettyBlockServer != null) { - nettyBlockServer.stop() - } - actorSystem.stop(slaveActor) blockInfo.clear() memoryStore.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index b1585bd8199d1f6a858ea14f977e3bb9d0541ca1..b7bcb2d85d0eed8ff62906b15ae5179a210f053a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -36,11 +36,10 @@ import org.apache.spark.util.Utils class BlockManagerId private ( private var executorId_ : String, private var host_ : String, - private var port_ : Int, - private var nettyPort_ : Int + private var port_ : Int ) extends Externalizable { - private def this() = this(null, null, 0, 0) // For deserialization only + private def this() = this(null, null, 0) // For deserialization only def executorId: String = executorId_ @@ -60,32 +59,28 @@ class BlockManagerId private ( def port: Int = port_ - def nettyPort: Int = nettyPort_ - override def writeExternal(out: ObjectOutput) { out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) - out.writeInt(nettyPort_) } override def readExternal(in: ObjectInput) { executorId_ = in.readUTF() host_ = in.readUTF() port_ = in.readInt() - nettyPort_ = in.readInt() } @throws(classOf[IOException]) private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this) - override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort) + override def toString = s"BlockManagerId($executorId, $host, $port)" - override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort + override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port override def equals(that: Any) = that match { case id: BlockManagerId => - executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort + executorId == id.executorId && port == id.port && host == id.host case _ => false } @@ -100,11 +95,10 @@ private[spark] object BlockManagerId { * @param execId ID of the executor. * @param host Host name of the block manager. * @param port Port of the block manager. - * @param nettyPort Optional port for the Netty-based shuffle sender. * @return A new [[org.apache.spark.storage.BlockManagerId]]. */ - def apply(execId: String, host: String, port: Int, nettyPort: Int) = - getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort)) + def apply(execId: String, host: String, port: Int) = + getCachedBlockManagerId(new BlockManagerId(execId, host, port)) def apply(in: ObjectInput) = { val obj = new BlockManagerId() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index db7384705fc1b33c2a4ce239c1502cc4d52f46c1..a7543454eca1ff6b5f00878724cd3b8a94698821 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -295,8 +295,7 @@ private[spark] object JsonProtocol { def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = { ("Executor ID" -> blockManagerId.executorId) ~ ("Host" -> blockManagerId.host) ~ - ("Port" -> blockManagerId.port) ~ - ("Netty Port" -> blockManagerId.nettyPort) + ("Port" -> blockManagerId.port) } def jobResultToJson(jobResult: JobResult): JValue = { @@ -644,8 +643,7 @@ private[spark] object JsonProtocol { val executorId = (json \ "Executor ID").extract[String] val host = (json \ "Host").extract[String] val port = (json \ "Port").extract[Int] - val nettyPort = (json \ "Netty Port").extract[Int] - BlockManagerId(executorId, host, port, nettyPort) + BlockManagerId(executorId, host, port) } def jobResultFromJson(json: JValue): JobResult = { diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 9702838085627b7fd555071eff872d8a25867a94..5369169811f81ec23f82c3698d51521fc80cd288 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -69,13 +69,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize10000 = MapOutputTracker.compressSize(10000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) val size10000 = MapOutputTracker.decompressSize(compressedSize10000) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000))) val statuses = tracker.getServerStatuses(10, 0) - assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000, 0), size1000), - (BlockManagerId("b", "hostB", 1000, 0), size10000))) + assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), + (BlockManagerId("b", "hostB", 1000), size10000))) tracker.stop() } @@ -86,9 +86,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000))) assert(tracker.containsShuffle(10)) assert(tracker.getServerStatuses(10, 0).nonEmpty) @@ -105,14 +105,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize1000, compressedSize1000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000, compressedSize1000))) // As if we had two simultaneous fetch failures - tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) - tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) + tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) + tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) // The remaining reduce task might try to grab the output despite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the @@ -145,13 +145,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize1000 = MapOutputTracker.compressSize(1000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000))) + BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) + Seq((BlockManagerId("a", "hostA", 1000), size1000))) - masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) + masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } @@ -174,7 +174,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // Frame size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) + BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0))) masterActor.receive(GetMapOutputStatuses(10)) } @@ -195,7 +195,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0))) } intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f5fed988ade24bdd72738b649d819e8f18fdd19d..1a42fc1b233ba1f3fdd5f64fd79ba447dd7ef57f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -736,7 +736,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2)) private def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345, 0) + BlockManagerId("exec-" + host, host, 12345) private def assertDataStructuresEmpty = { assert(scheduler.activeJobs.isEmpty) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index bcbfe8baf36ada29c64814686e39889fcf5e979b..159128438390dd72bd899f6e29746b488aed4a5d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -41,7 +41,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { val blockManager = mock(classOf[BlockManager]) val connManager = mock(classOf[ConnectionManager]) doReturn(connManager).when(blockManager).connectionManager - doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId + doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight @@ -66,7 +66,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) - val bmId = BlockManagerId("test-client", "test-client",1 , 0) + val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq) ) @@ -97,7 +97,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { val blockManager = mock(classOf[BlockManager]) val connManager = mock(classOf[ConnectionManager]) doReturn(connManager).when(blockManager).connectionManager - doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId + doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight @@ -117,7 +117,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) - val bmId = BlockManagerId("test-client", "test-client",1 , 0) + val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq) ) @@ -155,12 +155,12 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { when(blockManager.futureExecContext).thenReturn(global) when(blockManager.blockManagerId).thenReturn( - BlockManagerId("test-client", "test-client", 1, 0)) + BlockManagerId("test-client", "test-client", 1)) when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024) val blId1 = ShuffleBlockId(0,0,0) val blId2 = ShuffleBlockId(0,1,0) - val bmId = BlockManagerId("test-server", "test-server",1 , 0) + val bmId = BlockManagerId("test-server", "test-server", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( (bmId, Seq((blId1, 1L), (blId2, 1L))) ) @@ -211,10 +211,10 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { when(blockManager.futureExecContext).thenReturn(global) when(blockManager.blockManagerId).thenReturn( - BlockManagerId("test-client", "test-client", 1, 0)) + BlockManagerId("test-client", "test-client", 1)) when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024) - val bmId = BlockManagerId("test-server", "test-server",1 , 0) + val bmId = BlockManagerId("test-server", "test-server", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( (bmId, Seq((blId1, 1L), (blId2, 1L))) ) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f32ce6f9fcc7f75f1b5838cf62c25e0e57c0f431..bdcea07e5714f4bf48fd4e8cf58c4b8345bfa4cc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -139,9 +139,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("BlockManagerId object caching") { - val id1 = BlockManagerId("e1", "XXX", 1, 0) - val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1 - val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object + val id1 = BlockManagerId("e1", "XXX", 1) + val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1 + val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object assert(id2 === id1, "id2 is not same as id1") assert(id2.eq(id1), "id2 is not the same object as id1") assert(id3 != id1, "id3 is same as id1") diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 7671cb969a26b1e0d554b5efc57a73f55668246d..4e022a69c8212db83261902f7c1989c08afa2961 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -26,8 +26,8 @@ import org.apache.spark.scheduler._ * Test the behavior of StorageStatusListener in response to all relevant events. */ class StorageStatusListenerSuite extends FunSuite { - private val bm1 = BlockManagerId("big", "dog", 1, 1) - private val bm2 = BlockManagerId("fat", "duck", 2, 2) + private val bm1 = BlockManagerId("big", "dog", 1) + private val bm2 = BlockManagerId("fat", "duck", 2) private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index 38678bbd1dd28c3749b32e67ca7c652662d56866..ef5c55f91c39a5757b71d653ca89cedbe01935ac 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -27,7 +27,7 @@ class StorageSuite extends FunSuite { // For testing add, update, and remove (for non-RDD blocks) private def storageStatus1: StorageStatus = { - val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) + val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L) assert(status.blocks.isEmpty) assert(status.rddBlocks.isEmpty) assert(status.memUsed === 0L) @@ -78,7 +78,7 @@ class StorageSuite extends FunSuite { // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks private def storageStatus2: StorageStatus = { - val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) + val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L) assert(status.rddBlocks.isEmpty) status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L)) status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L)) @@ -271,9 +271,9 @@ class StorageSuite extends FunSuite { // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations private def stockStorageStatuses: Seq[StorageStatus] = { - val status1 = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) - val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2, 2), 2000L) - val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3, 3), 3000L) + val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L) + val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L) + val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L) status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L)) status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L)) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index a537c72ce7ab549e6cefd6188185e5af9a9135cb..d9e9c70a8a9e784d5aae5df21d519e040aee662c 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -39,7 +39,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly) private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk) private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk) - private val bm1 = BlockManagerId("big", "dog", 1, 1) + private val bm1 = BlockManagerId("big", "dog", 1) before { bus = new LiveListenerBus diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index c4765e53de17be4fa7c5aa599d21acb0a2271a80..76bf4cfd112678d356d061cd649414d22c0ddf3d 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -17,13 +17,16 @@ package org.apache.spark.util +import scala.concurrent.Await + import akka.actor._ + +import org.scalatest.FunSuite + import org.apache.spark._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId -import org.scalatest.FunSuite -import scala.concurrent.Await /** * Test the AkkaUtils with various security settings. @@ -35,7 +38,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { conf.set("spark.authenticate", "true") conf.set("spark.authenticate.secret", "good") - val securityManager = new SecurityManager(conf); + val securityManager = new SecurityManager(conf) val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) @@ -106,13 +109,13 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val compressedSize1000 = MapOutputTracker.compressSize(1000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000))) + BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) // this should succeed since security off assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) + Seq((BlockManagerId("a", "hostA", 1000), size1000))) actorSystem.shutdown() slaveSystem.shutdown() @@ -157,13 +160,13 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val compressedSize1000 = MapOutputTracker.compressSize(1000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000))) + BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) // this should succeed since security on and passwords match assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) + Seq((BlockManagerId("a", "hostA", 1000), size1000))) actorSystem.shutdown() slaveSystem.shutdown() diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 2fd3b9cfd221ace4fdaa193459f7862a25ea9e28..66a17de9ec9ce8dcfa4641480a1d6147bb2256a7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -53,9 +53,9 @@ class JsonProtocolSuite extends FunSuite { "Classpath Entries" -> Seq(("Super library", "/tmp/super_library")) )) val blockManagerAdded = SparkListenerBlockManagerAdded( - BlockManagerId("Stars", "In your multitude...", 300, 400), 500) + BlockManagerId("Stars", "In your multitude...", 300), 500) val blockManagerRemoved = SparkListenerBlockManagerRemoved( - BlockManagerId("Scarce", "to be counted...", 100, 200)) + BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) @@ -81,7 +81,7 @@ class JsonProtocolSuite extends FunSuite { testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) - testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) + testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -104,7 +104,7 @@ class JsonProtocolSuite extends FunSuite { testJobResult(jobFailed) // TaskEndReason - val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15, 16), 17, 18, 19) + val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19) val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None) testTaskEndReason(Success) testTaskEndReason(Resubmitted) @@ -343,7 +343,6 @@ class JsonProtocolSuite extends FunSuite { assert(bm1.executorId === bm2.executorId) assert(bm1.host === bm2.host) assert(bm1.port === bm2.port) - assert(bm1.nettyPort === bm2.nettyPort) } private def assertEquals(result1: JobResult, result2: JobResult) { @@ -944,8 +943,7 @@ class JsonProtocolSuite extends FunSuite { | "Block Manager ID": { | "Executor ID": "Stars", | "Host": "In your multitude...", - | "Port": 300, - | "Netty Port": 400 + | "Port": 300 | }, | "Maximum Memory": 500 |} @@ -958,8 +956,7 @@ class JsonProtocolSuite extends FunSuite { | "Block Manager ID": { | "Executor ID": "Scarce", | "Host": "to be counted...", - | "Port": 100, - | "Netty Port": 200 + | "Port": 100 | } |} """