diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 811610c657b62475c378588ccd5f0d4945f0b036..315ed91f81df3a9978c8802a5c9ff1a824dfe73f 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -32,10 +32,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { private val loading = new HashSet[RDDBlockId]() /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ - def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, + def getOrCompute[T]( + rdd: RDD[T], + split: Partition, + context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { + val key = RDDBlockId(rdd.id, split.index) - logDebug("Looking for partition " + key) + logDebug(s"Looking for partition $key") blockManager.get(key) match { case Some(values) => // Partition is already materialized, so just return its values @@ -45,7 +49,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Mark the split as loading (unless someone else marks it first) loading.synchronized { if (loading.contains(key)) { - logInfo("Another thread is loading %s, waiting for it to finish...".format(key)) + logInfo(s"Another thread is loading $key, waiting for it to finish...") while (loading.contains(key)) { try { loading.wait() @@ -54,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logWarning(s"Got an exception while waiting for another thread to load $key", e) } } - logInfo("Finished waiting for %s".format(key)) + logInfo(s"Finished waiting for $key") /* See whether someone else has successfully loaded it. The main way this would fail * is for the RDD-level cache eviction policy if someone else has loaded the same RDD * partition but we didn't want to make space for it. However, that case is unlikely @@ -64,7 +68,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => - logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) + logInfo(s"Whoever was loading $key failed; we'll try it ourselves") loading.add(key) } } else { @@ -73,7 +77,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - logInfo("Partition %s not found, computing it".format(key)) + logInfo(s"Partition $key not found, computing it") val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally @@ -97,8 +101,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(values) => values.asInstanceOf[Iterator[T]] case None => - logInfo("Failure to store %s".format(key)) - throw new Exception("Block manager failed to return persisted valued") + logInfo(s"Failure to store $key") + throw new SparkException("Block manager failed to return persisted value") } } else { // In this case the RDD is cached to an array buffer. This will save the results diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala index c8f397609a0b473fc6c8557b3edf7f155509b2bb..22fdf73e9d1f47270063652ba4bf9bb6069321fc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala @@ -29,9 +29,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea setInitThread() private def setInitThread() { - // Set current thread as init thread - waitForReady will not block this thread - // (in case there is non trivial initialization which ends up calling waitForReady as part of - // initialization itself) + /* Set current thread as init thread - waitForReady will not block this thread + * (in case there is non trivial initialization which ends up calling waitForReady + * as part of initialization itself) */ BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread()) } @@ -42,7 +42,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea def waitForReady(): Boolean = { if (pending && initThread != Thread.currentThread()) { synchronized { - while (pending) this.wait() + while (pending) { + this.wait() + } } } !failed @@ -50,8 +52,8 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea /** Mark this BlockInfo as ready (i.e. block is finished writing) */ def markReady(sizeInBytes: Long) { - require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes) - assert (pending) + require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes") + assert(pending) size = sizeInBytes BlockInfo.blockInfoInitThreads.remove(this) synchronized { @@ -61,7 +63,7 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea /** Mark this BlockInfo as ready but failed */ def markFailure() { - assert (pending) + assert(pending) size = BlockInfo.BLOCK_FAILED BlockInfo.blockInfoInitThreads.remove(this) synchronized { @@ -71,9 +73,9 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea } private object BlockInfo { - // initThread is logically a BlockInfo field, but we store it here because - // it's only needed while this block is in the 'pending' state and we want - // to minimize BlockInfo's memory footprint. + /* initThread is logically a BlockInfo field, but we store it here because + * it's only needed while this block is in the 'pending' state and we want + * to minimize BlockInfo's memory footprint. */ private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] private val BLOCK_PENDING: Long = -1L diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9cd79d262ea533c3ddee0a096ef67bcda38a70bd..f52bc7075104b0d7d4909357e29c455e4b873b8b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,46 +28,48 @@ import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} import sun.nio.ch.DirectBuffer -import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException} +import org.apache.spark._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ -private[spark] sealed trait Values - -private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends Values -private[spark] case class IteratorValues(iterator: Iterator[Any]) extends Values -private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values +private[spark] sealed trait BlockValues +private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues +private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues +private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, val master: BlockManagerMaster, - val defaultSerializer: Serializer, + defaultSerializer: Serializer, maxMemory: Long, - val _conf: SparkConf, + val conf: SparkConf, securityManager: SecurityManager, mapOutputTracker: MapOutputTracker) extends Logging { - def conf = _conf val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, - conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) + conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) + val connectionManager = new ConnectionManager(0, conf, securityManager) + + implicit val futureExecContext = connectionManager.futureExecContext private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] + // Actual storage of where blocks are kept + private var tachyonInitialized = false private[storage] val memoryStore = new MemoryStore(this, maxMemory) private[storage] val diskStore = new DiskStore(this, diskBlockManager) - var tachyonInitialized = false private[storage] lazy val tachyonStore: TachyonStore = { val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") val appFolderName = conf.get("spark.tachyonStore.folderName") - val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}" + val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") - val tachyonBlockManager = new TachyonBlockManager( - shuffleBlockManager, tachyonStorePath, tachyonMaster) + val tachyonBlockManager = + new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster) tachyonInitialized = true new TachyonStore(this, tachyonBlockManager) } @@ -79,43 +81,39 @@ private[spark] class BlockManager( if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } - val connectionManager = new ConnectionManager(0, conf, securityManager) - implicit val futureExecContext = connectionManager.futureExecContext - val blockManagerId = BlockManagerId( executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) - val maxBytesInFlight = - conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024 + val maxBytesInFlight = conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024 // Whether to compress broadcast variables that are stored - val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) + private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) // Whether to compress shuffle output that are stored - val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) + private val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) // Whether to compress RDD partitions that are stored serialized - val compressRdds = conf.getBoolean("spark.rdd.compress", false) + private val compressRdds = conf.getBoolean("spark.rdd.compress", false) // Whether to compress shuffle output temporarily spilled to disk - val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) + private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) - val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) - - val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)), + private val slaveActor = actorSystem.actorOf( + Props(new BlockManagerSlaveActor(this, mapOutputTracker)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) - // Pending re-registration action being executed asynchronously or null if none - // is pending. Accesses should synchronize on asyncReregisterLock. - var asyncReregisterTask: Future[Unit] = null - val asyncReregisterLock = new Object + // Pending re-registration action being executed asynchronously or null if none is pending. + // Accesses should synchronize on asyncReregisterLock. + private var asyncReregisterTask: Future[Unit] = null + private val asyncReregisterLock = new Object - private def heartBeat() { + private def heartBeat(): Unit = { if (!master.sendHeartBeat(blockManagerId)) { reregister() } } - var heartBeatTask: Cancellable = null + private val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) + private var heartBeatTask: Cancellable = null private val metadataCleaner = new MetadataCleaner( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) @@ -124,11 +122,11 @@ private[spark] class BlockManager( initialize() - // The compression codec to use. Note that the "lazy" val is necessary because we want to delay - // the initialization of the compression codec until it is first used. The reason is that a Spark - // program could be using a user-defined codec in a third party jar, which is loaded in - // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been - // loaded yet. + /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay + * the initialization of the compression codec until it is first used. The reason is that a Spark + * program could be using a user-defined codec in a third party jar, which is loaded in + * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been + * loaded yet. */ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) /** @@ -150,7 +148,7 @@ private[spark] class BlockManager( * Initialize the BlockManager. Register to the BlockManagerMaster, and start the * BlockManagerWorker actor. */ - private def initialize() { + private def initialize(): Unit = { master.registerBlockManager(blockManagerId, maxMemory, slaveActor) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting(conf)) { @@ -170,12 +168,12 @@ private[spark] class BlockManager( * heart beat attempt or new block registration and another try to re-register all blocks * will be made then. */ - private def reportAllBlocks() { - logInfo("Reporting " + blockInfo.size + " blocks to the master.") + private def reportAllBlocks(): Unit = { + logInfo(s"Reporting ${blockInfo.size} blocks to the master.") for ((blockId, info) <- blockInfo) { val status = getCurrentBlockStatus(blockId, info) if (!tryToReportBlockStatus(blockId, info, status)) { - logError("Failed to report " + blockId + " to master; giving up.") + logError(s"Failed to report $blockId to master; giving up.") return } } @@ -187,7 +185,7 @@ private[spark] class BlockManager( * * Note that this method must be called without any BlockInfo locks held. */ - def reregister() { + private def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo("BlockManager re-registering with master") master.registerBlockManager(blockManagerId, maxMemory, slaveActor) @@ -197,7 +195,7 @@ private[spark] class BlockManager( /** * Re-register with the master sometime soon. */ - def asyncReregister() { + private def asyncReregister(): Unit = { asyncReregisterLock.synchronized { if (asyncReregisterTask == null) { asyncReregisterTask = Future[Unit] { @@ -213,7 +211,7 @@ private[spark] class BlockManager( /** * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing. */ - def waitForAsyncReregister() { + def waitForAsyncReregister(): Unit = { val task = asyncReregisterTask if (task != null) { Await.ready(task, Duration.Inf) @@ -251,18 +249,18 @@ private[spark] class BlockManager( * it is still valid). This ensures that update in master will compensate for the increase in * memory on slave. */ - def reportBlockStatus( + private def reportBlockStatus( blockId: BlockId, info: BlockInfo, status: BlockStatus, - droppedMemorySize: Long = 0L) { + droppedMemorySize: Long = 0L): Unit = { val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize) if (needReregister) { - logInfo("Got told to re-register updating block " + blockId) + logInfo(s"Got told to re-register updating block $blockId") // Re-registering will report our new block for free. asyncReregister() } - logDebug("Told master about block " + blockId) + logDebug(s"Told master about block $blockId") } /** @@ -293,10 +291,10 @@ private[spark] class BlockManager( * and the updated in-memory and on-disk sizes. */ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { - val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized { + info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L, 0L) + BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val inTachyon = level.useOffHeap && tachyonStore.contains(blockId) @@ -307,19 +305,18 @@ private[spark] class BlockManager( val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - (storageLevel, memSize, diskSize, tachyonSize) + BlockStatus(storageLevel, memSize, diskSize, tachyonSize) } } - BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize) } /** * Get locations of an array of blocks. */ - def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = { + private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = { val startTimeMs = System.currentTimeMillis val locations = master.getLocations(blockIds).toArray - logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs))) locations } @@ -329,15 +326,16 @@ private[spark] class BlockManager( * never deletes (recent) items. */ def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - diskStore.getValues(blockId, serializer).orElse( - sys.error("Block " + blockId + " not found on disk, though it should be")) + diskStore.getValues(blockId, serializer).orElse { + throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be") + } } /** * Get block from local block manager. */ def getLocal(blockId: BlockId): Option[Iterator[Any]] = { - logDebug("Getting local block " + blockId) + logDebug(s"Getting local block $blockId") doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] } @@ -345,7 +343,7 @@ private[spark] class BlockManager( * Get block from the local block manager as serialized bytes. */ def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { - logDebug("Getting local block " + blockId + " as bytes") + logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { @@ -353,7 +351,8 @@ private[spark] class BlockManager( case Some(bytes) => Some(bytes) case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") + throw new BlockException( + blockId, s"Block $blockId not found on disk, though it should be") } } else { doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] @@ -368,16 +367,16 @@ private[spark] class BlockManager( // If another thread is writing the block, wait for it to become ready. if (!info.waitForReady()) { // If we get here, the block write failed. - logWarning("Block " + blockId + " was marked as failure.") + logWarning(s"Block $blockId was marked as failure.") return None } val level = info.level - logDebug("Level for block " + blockId + " is " + level) + logDebug(s"Level for block $blockId is $level") // Look for the block in memory if (level.useMemory) { - logDebug("Getting block " + blockId + " from memory") + logDebug(s"Getting block $blockId from memory") val result = if (asValues) { memoryStore.getValues(blockId) } else { @@ -387,51 +386,51 @@ private[spark] class BlockManager( case Some(values) => return Some(values) case None => - logDebug("Block " + blockId + " not found in memory") + logDebug(s"Block $blockId not found in memory") } } // Look for the block in Tachyon if (level.useOffHeap) { - logDebug("Getting block " + blockId + " from tachyon") + logDebug(s"Getting block $blockId from tachyon") if (tachyonStore.contains(blockId)) { tachyonStore.getBytes(blockId) match { - case Some(bytes) => { + case Some(bytes) => if (!asValues) { return Some(bytes) } else { return Some(dataDeserialize(blockId, bytes)) } - } case None => - logDebug("Block " + blockId + " not found in tachyon") + logDebug(s"Block $blockId not found in tachyon") } } } - // Look for block on disk, potentially storing it back into memory if required: + // Look for block on disk, potentially storing it back in memory if required if (level.useDisk) { - logDebug("Getting block " + blockId + " from disk") + logDebug(s"Getting block $blockId from disk") val bytes: ByteBuffer = diskStore.getBytes(blockId) match { - case Some(bytes) => bytes + case Some(b) => b case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") + throw new BlockException( + blockId, s"Block $blockId not found on disk, though it should be") } - assert (0 == bytes.position()) + assert(0 == bytes.position()) if (!level.useMemory) { - // If the block shouldn't be stored in memory, we can just return it: + // If the block shouldn't be stored in memory, we can just return it if (asValues) { return Some(dataDeserialize(blockId, bytes)) } else { return Some(bytes) } } else { - // Otherwise, we also have to store something in the memory store: + // Otherwise, we also have to store something in the memory store if (!level.deserialized || !asValues) { - // We'll store the bytes in memory if the block's storage level includes - // "memory serialized", or if it should be cached as objects in memory - // but we only requested its serialized bytes: + /* We'll store the bytes in memory if the block's storage level includes + * "memory serialized", or if it should be cached as objects in memory + * but we only requested its serialized bytes. */ val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) @@ -442,16 +441,17 @@ private[spark] class BlockManager( } else { val values = dataDeserialize(blockId, bytes) if (level.deserialized) { - // Cache the values before returning them: + // Cache the values before returning them // TODO: Consider creating a putValues that also takes in a iterator? val valuesBuffer = new ArrayBuffer[Any] valuesBuffer ++= values - memoryStore.putValues(blockId, valuesBuffer, level, true).data match { - case Left(values2) => - return Some(values2) - case _ => - throw new Exception("Memory store did not return back an iterator") - } + memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data + match { + case Left(values2) => + return Some(values2) + case _ => + throw new SparkException("Memory store did not return an iterator") + } } else { return Some(values) } @@ -460,7 +460,7 @@ private[spark] class BlockManager( } } } else { - logDebug("Block " + blockId + " not registered locally") + logDebug(s"Block $blockId not registered locally") } None } @@ -469,7 +469,7 @@ private[spark] class BlockManager( * Get block from remote block managers. */ def getRemote(blockId: BlockId): Option[Iterator[Any]] = { - logDebug("Getting remote block " + blockId) + logDebug(s"Getting remote block $blockId") doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] } @@ -477,7 +477,7 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { - logDebug("Getting remote block " + blockId + " as bytes") + logDebug(s"Getting remote block $blockId as bytes") doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] } @@ -485,7 +485,7 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { - logDebug("Getting remote block " + blockId + " from " + loc) + logDebug(s"Getting remote block $blockId from $loc") val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { @@ -495,9 +495,9 @@ private[spark] class BlockManager( return Some(data) } } - logDebug("The value of block " + blockId + " is null") + logDebug(s"The value of block $blockId is null") } - logDebug("Block " + blockId + " not found") + logDebug(s"Block $blockId not found") None } @@ -507,12 +507,12 @@ private[spark] class BlockManager( def get(blockId: BlockId): Option[Iterator[Any]] = { val local = getLocal(blockId) if (local.isDefined) { - logInfo("Found block %s locally".format(blockId)) + logInfo(s"Found block $blockId locally") return local } val remote = getRemote(blockId) if (remote.isDefined) { - logInfo("Found block %s remotely".format(blockId)) + logInfo(s"Found block $blockId remotely") return remote } None @@ -533,7 +533,6 @@ private[spark] class BlockManager( } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) } - iter.initialize() iter } @@ -543,6 +542,7 @@ private[spark] class BlockManager( values: Iterator[Any], level: StorageLevel, tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = { + require(values != null, "Values is null") doPut(blockId, IteratorValues(values), level, tellMaster) } @@ -562,8 +562,8 @@ private[spark] class BlockManager( } /** - * Put a new block of values to the block manager. Return a list of blocks updated as a - * result of this put. + * Put a new block of values to the block manager. + * Return a list of blocks updated as a result of this put. */ def put( blockId: BlockId, @@ -575,8 +575,8 @@ private[spark] class BlockManager( } /** - * Put a new block of serialized bytes to the block manager. Return a list of blocks updated - * as a result of this put. + * Put a new block of serialized bytes to the block manager. + * Return a list of blocks updated as a result of this put. */ def putBytes( blockId: BlockId, @@ -589,7 +589,7 @@ private[spark] class BlockManager( private def doPut( blockId: BlockId, - data: Values, + data: BlockValues, level: StorageLevel, tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { @@ -599,20 +599,18 @@ private[spark] class BlockManager( // Return value val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - // Remember the block's storage level so that we can correctly drop it to disk if it needs - // to be dropped right after it got put into memory. Note, however, that other threads will - // not be able to get() this block until we call markReady on its BlockInfo. + /* Remember the block's storage level so that we can correctly drop it to disk if it needs + * to be dropped right after it got put into memory. Note, however, that other threads will + * not be able to get() this block until we call markReady on its BlockInfo. */ val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) - if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + logWarning(s"Block $blockId already exists on this machine; not re-adding it") return updatedBlocks } - // TODO: So the block info exists - but previous attempt to load it (?) failed. // What do we do now ? Retry on it ? oldBlockOpt.get @@ -623,10 +621,10 @@ private[spark] class BlockManager( val startTimeMs = System.currentTimeMillis - // If we're storing values and we need to replicate the data, we'll want access to the values, - // but because our put will read the whole iterator, there will be no values left. For the - // case where the put serializes data, we'll remember the bytes, above; but for the case where - // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. + /* If we're storing values and we need to replicate the data, we'll want access to the values, + * but because our put will read the whole iterator, there will be no values left. For the + * case where the put serializes data, we'll remember the bytes, above; but for the case where + * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */ var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put @@ -637,78 +635,62 @@ private[spark] class BlockManager( // If we're storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. - val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) { - // Duplicate doesn't copy the bytes, just creates a wrapper - val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate() - Future { - replicate(blockId, bufferView, level) - } - } else { - null + val replicationFuture = data match { + case b: ByteBufferValues if level.replication > 1 => + // Duplicate doesn't copy the bytes, but just creates a wrapper + val bufferView = b.buffer.duplicate() + Future { replicate(blockId, bufferView, level) } + case _ => null } putBlockInfo.synchronized { - logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) - + " to get into synchronized block") + logTrace("Put for block %s took %s to get into synchronized block" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) var marked = false try { - if (level.useMemory) { - // Save it just to memory first, even if it also has useDisk set to true; we will - // drop it to disk later if the memory store can't hold it. - val res = data match { - case IteratorValues(iterator) => - memoryStore.putValues(blockId, iterator, level, true) - case ArrayBufferValues(array) => - memoryStore.putValues(blockId, array, level, true) - case ByteBufferValues(bytes) => - bytes.rewind() - memoryStore.putBytes(blockId, bytes, level) - } - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case Left(newIterator) => valuesAfterPut = newIterator - } - // Keep track of which blocks are dropped from memory - res.droppedBlocks.foreach { block => updatedBlocks += block } - } else if (level.useOffHeap) { - // Save to Tachyon. - val res = data match { - case IteratorValues(iterator) => - tachyonStore.putValues(blockId, iterator, level, false) - case ArrayBufferValues(array) => - tachyonStore.putValues(blockId, array, level, false) - case ByteBufferValues(bytes) => - bytes.rewind() - tachyonStore.putBytes(blockId, bytes, level) - } - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case _ => - } - } else { - // Save directly to disk. - // Don't get back the bytes unless we replicate them. - val askForBytes = level.replication > 1 - - val res = data match { - case IteratorValues(iterator) => - diskStore.putValues(blockId, iterator, level, askForBytes) - case ArrayBufferValues(array) => - diskStore.putValues(blockId, array, level, askForBytes) - case ByteBufferValues(bytes) => - bytes.rewind() - diskStore.putBytes(blockId, bytes, level) - } - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case _ => + // returnValues - Whether to return the values put + // blockStore - The type of storage to put these values into + val (returnValues, blockStore: BlockStore) = { + if (level.useMemory) { + // Put it in memory first, even if it also has useDisk set to true; + // We will drop it to disk later if the memory store can't hold it. + (true, memoryStore) + } else if (level.useOffHeap) { + // Use tachyon for off-heap storage + (false, tachyonStore) + } else if (level.useDisk) { + // Don't get back the bytes from put unless we replicate them + (level.replication > 1, diskStore) + } else { + assert(level == StorageLevel.NONE) + throw new BlockException( + blockId, s"Attempted to put block $blockId without specifying storage level!") } } + // Actually put the values + val result = data match { + case IteratorValues(iterator) => + blockStore.putValues(blockId, iterator, level, returnValues) + case ArrayBufferValues(array) => + blockStore.putValues(blockId, array, level, returnValues) + case ByteBufferValues(bytes) => + bytes.rewind() + blockStore.putBytes(blockId, bytes, level) + } + size = result.size + result.data match { + case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator + case Right (newBytes) => bytesAfterPut = newBytes + case _ => + } + + // Keep track of which blocks are dropped from memory + if (level.useMemory) { + result.droppedBlocks.foreach { updatedBlocks += _ } + } + val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // Now that the block is in either the memory, tachyon, or disk store, @@ -728,18 +710,21 @@ private[spark] class BlockManager( // could've inserted a new BlockInfo before we remove it. blockInfo.remove(blockId) putBlockInfo.markFailure() - logWarning("Putting block " + blockId + " failed") + logWarning(s"Putting block $blockId failed") } } } - logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) // Either we're storing bytes and we asynchronously started replication, or we're storing // values and need to serialize and replicate them now: if (level.replication > 1) { data match { - case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf) - case _ => { + case ByteBufferValues(bytes) => + if (replicationFuture != null) { + Await.ready(replicationFuture, Duration.Inf) + } + case _ => val remoteStartTime = System.currentTimeMillis // Serialize the block if not already done if (bytesAfterPut == null) { @@ -750,20 +735,19 @@ private[spark] class BlockManager( bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, level) - logDebug("Put block " + blockId + " remotely took " + - Utils.getUsedTimeMs(remoteStartTime)) - } + logDebug("Put block %s remotely took %s" + .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } BlockManager.dispose(bytesAfterPut) if (level.replication > 1) { - logDebug("Put for block " + blockId + " with replication took " + - Utils.getUsedTimeMs(startTimeMs)) + logDebug("Putting block %s with replication took %s" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } else { - logDebug("Put for block " + blockId + " without replication took " + - Utils.getUsedTimeMs(startTimeMs)) + logDebug("Putting block %s without replication took %s" + .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } updatedBlocks @@ -773,7 +757,7 @@ private[spark] class BlockManager( * Replicate block to another node. */ @volatile var cachedPeers: Seq[BlockManagerId] = null - private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { + private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) if (cachedPeers == null) { @@ -782,15 +766,16 @@ private[spark] class BlockManager( for (peer: BlockManagerId <- cachedPeers) { val start = System.nanoTime data.rewind() - logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is " - + data.limit() + " Bytes. To node: " + peer) - if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), - new ConnectionManagerId(peer.host, peer.port))) { - logError("Failed to call syncPutBlock to " + peer) + logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + + s"To node: $peer") + val putBlock = PutBlock(blockId, data, tLevel) + val cmId = new ConnectionManagerId(peer.host, peer.port) + val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId) + if (!syncPutBlockSuccess) { + logError(s"Failed to call syncPutBlock to $peer") } - logDebug("Replicated BlockId " + blockId + " once used " + - (System.nanoTime - start) / 1e6 + " s; The size of the data is " + - data.limit() + " bytes.") + logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes." + .format(blockId, (System.nanoTime - start) / 1e6, data.limit())) } } @@ -822,17 +807,17 @@ private[spark] class BlockManager( blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = { - logInfo("Dropping block " + blockId + " from memory") + logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId).orNull // If the block has not already been dropped - if (info != null) { + if (info != null) { info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. if (!info.waitForReady()) { // If we get here, the block write failed. - logWarning("Block " + blockId + " was marked as failure. Nothing to drop") + logWarning(s"Block $blockId was marked as failure. Nothing to drop") return None } @@ -841,10 +826,10 @@ private[spark] class BlockManager( // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { - logInfo("Writing block " + blockId + " to disk") + logInfo(s"Writing block $blockId to disk") data match { case Left(elements) => - diskStore.putValues(blockId, elements, level, false) + diskStore.putValues(blockId, elements, level, returnValues = false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } @@ -858,7 +843,7 @@ private[spark] class BlockManager( if (blockIsRemoved) { blockIsUpdated = true } else { - logWarning("Block " + blockId + " could not be dropped from memory as it does not exist") + logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) @@ -883,7 +868,7 @@ private[spark] class BlockManager( */ def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. - logInfo("Removing RDD " + rddId) + logInfo(s"Removing RDD $rddId") val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size @@ -893,7 +878,7 @@ private[spark] class BlockManager( * Remove all blocks belonging to the given broadcast. */ def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { - logInfo("Removing broadcast " + broadcastId) + logInfo(s"Removing broadcast $broadcastId") val blocksToRemove = blockInfo.keys.collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } @@ -904,40 +889,42 @@ private[spark] class BlockManager( /** * Remove a block from both memory and disk. */ - def removeBlock(blockId: BlockId, tellMaster: Boolean = true) { - logInfo("Removing block " + blockId) + def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { + logInfo(s"Removing block $blockId") val info = blockInfo.get(blockId).orNull - if (info != null) info.synchronized { - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { - logWarning("Block " + blockId + " could not be removed as it was not found in either " + - "the disk, memory, or tachyon store") - } - blockInfo.remove(blockId) - if (tellMaster && info.tellMaster) { - val status = getCurrentBlockStatus(blockId, info) - reportBlockStatus(blockId, info, status) + if (info != null) { + info.synchronized { + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or tachyon store") + } + blockInfo.remove(blockId) + if (tellMaster && info.tellMaster) { + val status = getCurrentBlockStatus(blockId, info) + reportBlockStatus(blockId, info, status) + } } } else { // The block has already been removed; do nothing. - logWarning("Asked to remove block " + blockId + ", which does not exist") + logWarning(s"Asked to remove block $blockId, which does not exist") } } - private def dropOldNonBroadcastBlocks(cleanupTime: Long) { - logInfo("Dropping non broadcast blocks older than " + cleanupTime) + private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = { + logInfo(s"Dropping non broadcast blocks older than $cleanupTime") dropOldBlocks(cleanupTime, !_.isBroadcast) } - private def dropOldBroadcastBlocks(cleanupTime: Long) { - logInfo("Dropping broadcast blocks older than " + cleanupTime) + private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = { + logInfo(s"Dropping broadcast blocks older than $cleanupTime") dropOldBlocks(cleanupTime, _.isBroadcast) } - private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) { + private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = { val iterator = blockInfo.getEntrySet.iterator while (iterator.hasNext) { val entry = iterator.next() @@ -945,17 +932,11 @@ private[spark] class BlockManager( if (time < cleanupTime && shouldDrop(id)) { info.synchronized { val level = info.level - if (level.useMemory) { - memoryStore.remove(id) - } - if (level.useDisk) { - diskStore.remove(id) - } - if (level.useOffHeap) { - tachyonStore.remove(id) - } + if (level.useMemory) { memoryStore.remove(id) } + if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { tachyonStore.remove(id) } iterator.remove() - logInfo("Dropped block " + id) + logInfo(s"Dropped block $id") } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status) @@ -963,12 +944,14 @@ private[spark] class BlockManager( } } - def shouldCompress(blockId: BlockId): Boolean = blockId match { - case ShuffleBlockId(_, _, _) => compressShuffle - case BroadcastBlockId(_, _) => compressBroadcast - case RDDBlockId(_, _) => compressRdds - case TempBlockId(_) => compressShuffleSpill - case _ => false + private def shouldCompress(blockId: BlockId): Boolean = { + blockId match { + case _: ShuffleBlockId => compressShuffle + case _: BroadcastBlockId => compressBroadcast + case _: RDDBlockId => compressRdds + case _: TempBlockId => compressShuffleSpill + case _ => false + } } /** @@ -990,7 +973,7 @@ private[spark] class BlockManager( blockId: BlockId, outputStream: OutputStream, values: Iterator[Any], - serializer: Serializer = defaultSerializer) { + serializer: Serializer = defaultSerializer): Unit = { val byteStream = new BufferedOutputStream(outputStream) val ser = serializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() @@ -1016,16 +999,16 @@ private[spark] class BlockManager( serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - def getIterator = { + def getIterator: Iterator[Any] = { val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) serializer.newInstance().deserializeStream(stream).asIterator } if (blockId.isShuffle) { - // Reducer may need to read many local shuffle blocks and will wrap them into Iterators - // at the beginning. The wrapping will cost some memory (compression instance - // initialization, etc.). Reducer reads shuffle blocks one by one so we could do the - // wrapping lazily to save memory. + /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators + * at the beginning. The wrapping will cost some memory (compression instance + * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the + * wrapping lazily to save memory. */ class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { lazy val proxy = f override def hasNext: Boolean = proxy.hasNext @@ -1037,7 +1020,7 @@ private[spark] class BlockManager( } } - def stop() { + def stop(): Unit = { if (heartBeatTask != null) { heartBeatTask.cancel() } @@ -1059,9 +1042,9 @@ private[spark] class BlockManager( private[spark] object BlockManager extends Logging { - val ID_GENERATOR = new IdGenerator + private val ID_GENERATOR = new IdGenerator - def getMaxMemory(conf: SparkConf): Long = { + private def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) (Runtime.getRuntime.maxMemory * memoryFraction).toLong } @@ -1078,9 +1061,9 @@ private[spark] object BlockManager extends Logging { * waiting for the GC to find it because that could lead to huge numbers of open files. There's * unfortunately no standard API to do this. */ - def dispose(buffer: ByteBuffer) { + def dispose(buffer: ByteBuffer): Unit = { if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { - logTrace("Unmapping " + buffer) + logTrace(s"Unmapping $buffer") if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { buffer.asInstanceOf[DirectBuffer].cleaner().clean() } @@ -1093,7 +1076,7 @@ private[spark] object BlockManager extends Logging { blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = { // blockManagerMaster != null is used in tests - assert (env != null || blockManagerMaster != null) + assert(env != null || blockManagerMaster != null) val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { env.blockManager.getLocationBlockIds(blockIds) } else { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 3a7243a1ba19ce6fec360e67ce6d62998c8a80fa..2ec46d416f37de6935801e7636f894fab6ee7a2e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -40,9 +40,9 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64) - // Create one local directory for each path mentioned in spark.local.dir; then, inside this - // directory, create multiple subdirectories that we will hash files into, in order to avoid - // having really large inodes at the top level. + /* Create one local directory for each path mentioned in spark.local.dir; then, inside this + * directory, create multiple subdirectories that we will hash files into, in order to avoid + * having really large inodes at the top level. */ private val localDirs: Array[File] = createLocalDirs() private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null @@ -114,7 +114,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } private def createLocalDirs(): Array[File] = { - logDebug("Creating local directories at root dirs '" + rootDirs + "'") + logDebug(s"Creating local directories at root dirs '$rootDirs'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") rootDirs.split(",").map { rootDir => var foundLocalDir = false @@ -126,21 +126,20 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD tries += 1 try { localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - localDir = new File(rootDir, "spark-local-" + localDirId) + localDir = new File(rootDir, s"spark-local-$localDirId") if (!localDir.exists) { foundLocalDir = localDir.mkdirs() } } catch { case e: Exception => - logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e) + logWarning(s"Attempt $tries to create local dir $localDir failed", e) } } if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + - " attempts to create local dir in " + rootDir) + logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } - logInfo("Created local directory at " + localDir) + logInfo(s"Created local directory at $localDir") localDir } } @@ -163,7 +162,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) } catch { case e: Exception => - logError("Exception while deleting local spark dir: " + localDir, e) + logError(s"Exception while deleting local spark dir: $localDir", e) } } } @@ -175,7 +174,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private[storage] def startShuffleBlockSender(port: Int): Int = { shuffleSender = new ShuffleSender(port, this) - logInfo("Created ShuffleSender binding to port : " + shuffleSender.port) + logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}") shuffleSender.port } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 0ab9fad4227173b1a2445df9d8692e4eb5741352..ebff0cb5ba1532274312232a1c91b571e7d498c3 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -39,41 +39,39 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage diskManager.getBlockLocation(blockId).length } - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { // So that we do not modify the input offsets ! // duplicate does not copy buffer, so inexpensive val bytes = _bytes.duplicate() - logDebug("Attempting to put block " + blockId) + logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) - val channel = new FileOutputStream(file).getChannel() + val channel = new FileOutputStream(file).getChannel while (bytes.remaining > 0) { channel.write(bytes) } channel.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( - file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) - return PutResult(bytes.limit(), Right(bytes.duplicate())) + file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime)) + PutResult(bytes.limit(), Right(bytes.duplicate())) } override def putValues( blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - returnValues: Boolean) - : PutResult = { - return putValues(blockId, values.toIterator, level, returnValues) + returnValues: Boolean): PutResult = { + putValues(blockId, values.toIterator, level, returnValues) } override def putValues( blockId: BlockId, values: Iterator[Any], level: StorageLevel, - returnValues: Boolean) - : PutResult = { + returnValues: Boolean): PutResult = { - logDebug("Attempting to write values for block " + blockId) + logDebug(s"Attempting to write values for block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) @@ -95,7 +93,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val segment = diskManager.getBlockLocation(blockId) - val channel = new RandomAccessFile(segment.file, "r").getChannel() + val channel = new RandomAccessFile(segment.file, "r").getChannel try { // For small files, directly read rather than memory map @@ -131,7 +129,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage file.delete() } else { if (fileSegment.length < file.length()) { - logWarning("Could not delete block associated with only a part of a file: " + blockId) + logWarning(s"Could not delete block associated with only a part of a file: $blockId") } false } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 488f1ea9628f5289b89f9a09b57596417b8ebaf1..084a566c485606f573c21dd4f8d91f1db3291ac7 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -24,6 +24,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.util.{SizeEstimator, Utils} +private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) + /** * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as * serialized ByteBuffers. @@ -31,15 +33,13 @@ import org.apache.spark.util.{SizeEstimator, Utils} private class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { - case class Entry(value: Any, size: Long, deserialized: Boolean) - - private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true) + private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) @volatile private var currentMemory = 0L // Object used to ensure that only one thread is putting blocks and if necessary, dropping // blocks from the memory store. private val putLock = new Object() - logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory))) + logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) def freeMemory: Long = maxMemory - currentMemory @@ -101,7 +101,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else if (entry.deserialized) { Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) } else { - Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data + Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data } } @@ -124,8 +124,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = entries.remove(blockId) if (entry != null) { currentMemory -= entry.size - logInfo("Block %s of size %d dropped from memory (free %d)".format( - blockId, entry.size, freeMemory)) + logInfo(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)") true } else { false @@ -181,18 +180,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) droppedBlocks ++= freeSpaceResult.droppedBlocks if (enoughFreeSpace) { - val entry = new Entry(value, size, deserialized) + val entry = new MemoryEntry(value, size, deserialized) entries.synchronized { entries.put(blockId, entry) currentMemory += size } - if (deserialized) { - logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - } else { - logInfo("Block %s stored as bytes to memory (size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - } + val valuesOrBytes = if (deserialized) "values" else "bytes" + logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( + blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) putSuccess = true } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to @@ -221,13 +216,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Return whether there is enough free space, along with the blocks dropped in the process. */ private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { - logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( - space, currentMemory, maxMemory)) + logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] if (space > maxMemory) { - logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit") + logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") return ResultWithDroppedBlocks(success = false, droppedBlocks) } @@ -252,7 +246,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } if (maxMemory - (currentMemory - selectedMemory) >= space) { - logInfo(selectedBlocks.size + " blocks selected for dropping") + logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } // This should never be null as only one thread should be dropping diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 2d8ff1194a5dc29a6507608a8be224a6800911cf..1e35abaab53534318b3911dc0eace91043af3916 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -34,11 +34,11 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi class StorageLevel private( - private var useDisk_ : Boolean, - private var useMemory_ : Boolean, - private var useOffHeap_ : Boolean, - private var deserialized_ : Boolean, - private var replication_ : Int = 1) + private var _useDisk: Boolean, + private var _useMemory: Boolean, + private var _useOffHeap: Boolean, + private var _deserialized: Boolean, + private var _replication: Int = 1) extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. @@ -48,13 +48,13 @@ class StorageLevel private( def this() = this(false, true, false, false) // For deserialization - def useDisk = useDisk_ - def useMemory = useMemory_ - def useOffHeap = useOffHeap_ - def deserialized = deserialized_ - def replication = replication_ + def useDisk = _useDisk + def useMemory = _useMemory + def useOffHeap = _useOffHeap + def deserialized = _deserialized + def replication = _replication - assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") + assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes") if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") @@ -63,8 +63,9 @@ class StorageLevel private( require(replication == 1, "Off-heap storage level does not support multiple replication") } - override def clone(): StorageLevel = new StorageLevel( - this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication) + override def clone(): StorageLevel = { + new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication) + } override def equals(other: Any): Boolean = other match { case s: StorageLevel => @@ -77,20 +78,20 @@ class StorageLevel private( false } - def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0)) + def isValid = (useMemory || useDisk || useOffHeap) && (replication > 0) def toInt: Int = { var ret = 0 - if (useDisk_) { + if (_useDisk) { ret |= 8 } - if (useMemory_) { + if (_useMemory) { ret |= 4 } - if (useOffHeap_) { + if (_useOffHeap) { ret |= 2 } - if (deserialized_) { + if (_deserialized) { ret |= 1 } ret @@ -98,32 +99,34 @@ class StorageLevel private( override def writeExternal(out: ObjectOutput) { out.writeByte(toInt) - out.writeByte(replication_) + out.writeByte(_replication) } override def readExternal(in: ObjectInput) { val flags = in.readByte() - useDisk_ = (flags & 8) != 0 - useMemory_ = (flags & 4) != 0 - useOffHeap_ = (flags & 2) != 0 - deserialized_ = (flags & 1) != 0 - replication_ = in.readByte() + _useDisk = (flags & 8) != 0 + _useMemory = (flags & 4) != 0 + _useOffHeap = (flags & 2) != 0 + _deserialized = (flags & 1) != 0 + _replication = in.readByte() } @throws(classOf[IOException]) private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) - override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format( - useDisk, useMemory, useOffHeap, deserialized, replication) + override def toString: String = { + s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)" + } override def hashCode(): Int = toInt * 41 + replication - def description : String = { + + def description: String = { var result = "" result += (if (useDisk) "Disk " else "") result += (if (useMemory) "Memory " else "") result += (if (useOffHeap) "Tachyon " else "") result += (if (deserialized) "Deserialized " else "Serialized ") - result += "%sx Replicated".format(replication) + result += s"${replication}x Replicated" result } } @@ -165,7 +168,7 @@ object StorageLevel { case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2 case "OFF_HEAP" => OFF_HEAP - case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s) + case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s") } /** @@ -173,26 +176,37 @@ object StorageLevel { * Create a new StorageLevel object without setting useOffHeap. */ @DeveloperApi - def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, - deserialized: Boolean, replication: Int) = getCachedStorageLevel( + def apply( + useDisk: Boolean, + useMemory: Boolean, + useOffHeap: Boolean, + deserialized: Boolean, + replication: Int) = { + getCachedStorageLevel( new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)) + } /** * :: DeveloperApi :: * Create a new StorageLevel object. */ @DeveloperApi - def apply(useDisk: Boolean, useMemory: Boolean, - deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel( - new StorageLevel(useDisk, useMemory, false, deserialized, replication)) + def apply( + useDisk: Boolean, + useMemory: Boolean, + deserialized: Boolean, + replication: Int = 1) = { + getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication)) + } /** * :: DeveloperApi :: * Create a new StorageLevel object from its integer representation. */ @DeveloperApi - def apply(flags: Int, replication: Int): StorageLevel = + def apply(flags: Int, replication: Int): StorageLevel = { getCachedStorageLevel(new StorageLevel(flags, replication)) + } /** * :: DeveloperApi :: @@ -205,8 +219,8 @@ object StorageLevel { getCachedStorageLevel(obj) } - private[spark] - val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]() + private[spark] val storageLevelCache = + new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]() private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = { storageLevelCache.putIfAbsent(level, level) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index c37e76f89360580bba5d37b3843a52474d37f68b..d8ff4ff6bd42c7e0a02f777285b2a21569018eca 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -22,15 +22,10 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import tachyon.client.{WriteType, ReadType} +import tachyon.client.{ReadType, WriteType} import org.apache.spark.Logging import org.apache.spark.util.Utils -import org.apache.spark.serializer.Serializer - - -private class Entry(val size: Long) - /** * Stores BlockManager blocks on Tachyon. @@ -46,8 +41,8 @@ private class TachyonStore( tachyonManager.getFile(blockId.name).length } - override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { - putToTachyonStore(blockId, bytes, true) + override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { + putIntoTachyonStore(blockId, bytes, returnValues = true) } override def putValues( @@ -55,7 +50,7 @@ private class TachyonStore( values: ArrayBuffer[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - return putValues(blockId, values.toIterator, level, returnValues) + putValues(blockId, values.toIterator, level, returnValues) } override def putValues( @@ -63,12 +58,12 @@ private class TachyonStore( values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - logDebug("Attempting to write values for block " + blockId) - val _bytes = blockManager.dataSerialize(blockId, values) - putToTachyonStore(blockId, _bytes, returnValues) + logDebug(s"Attempting to write values for block $blockId") + val bytes = blockManager.dataSerialize(blockId, values) + putIntoTachyonStore(blockId, bytes, returnValues) } - private def putToTachyonStore( + private def putIntoTachyonStore( blockId: BlockId, bytes: ByteBuffer, returnValues: Boolean): PutResult = { @@ -76,7 +71,7 @@ private class TachyonStore( // duplicate does not copy buffer, so inexpensive val byteBuffer = bytes.duplicate() byteBuffer.rewind() - logDebug("Attempting to put block " + blockId + " into Tachyon") + logDebug(s"Attempting to put block $blockId into Tachyon") val startTime = System.currentTimeMillis val file = tachyonManager.getFile(blockId) val os = file.getOutStream(WriteType.TRY_CACHE) @@ -84,7 +79,7 @@ private class TachyonStore( os.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file in Tachyon in %d ms".format( - blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime))) + blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime)) if (returnValues) { PutResult(bytes.limit(), Right(bytes.duplicate())) @@ -106,10 +101,9 @@ private class TachyonStore( getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val file = tachyonManager.getFile(blockId) - if (file == null || file.getLocationHosts().size == 0) { + if (file == null || file.getLocationHosts.size == 0) { return None } val is = file.getInStream(ReadType.CACHE) @@ -121,16 +115,15 @@ private class TachyonStore( val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) buffer = ByteBuffer.wrap(bs) if (fetchSize != size) { - logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size + - " is not equal to fetched size " + fetchSize) + logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + + s"is not equal to fetched size $fetchSize") return None } } } catch { - case ioe: IOException => { - logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe) - return None - } + case ioe: IOException => + logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) + return None } Some(buffer) } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ee629794f60ad01230c3aba1191e7aede09f6fc7..042fdfcc47261d73b08a1dd56472394c5c206cac 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -53,6 +53,8 @@ object MimaExcludes { "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.storage.MemoryStore.Entry"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$" + "createZero$1") @@ -67,7 +69,10 @@ object MimaExcludes { ) ++ MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++ MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++ - MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") + MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++ + MimaBuild.excludeSparkClass("storage.Values") ++ + MimaBuild.excludeSparkClass("storage.Entry") ++ + MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") case v if v.startsWith("1.0") => Seq( MimaBuild.excludeSparkPackage("api.java"),