diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 4964060b1c46c4fa24e90795d9d88b3368592086..5849045a55f505b50b2933a068bba7ba72aeab94 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -40,21 +40,36 @@ class BlockManager( class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { var pending: Boolean = true var size: Long = -1L + var failed: Boolean = false - /** Wait for this BlockInfo to be marked as ready (i.e. block is finished writing) */ - def waitForReady() { + /** + * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). + * Return true if the block is available, false otherwise. + */ + def waitForReady(): Boolean = { if (pending) { synchronized { while (pending) this.wait() } } + !failed } /** Mark this BlockInfo as ready (i.e. block is finished writing) */ def markReady(sizeInBytes: Long) { - pending = false - size = sizeInBytes synchronized { + pending = false + failed = false + size = sizeInBytes + this.notifyAll() + } + } + + /** Mark this BlockInfo as ready but failed */ + def markFailure() { + synchronized { + failed = true + pending = false this.notifyAll() } } @@ -277,7 +292,14 @@ class BlockManager( val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - info.waitForReady() // In case the block is still being put() by another thread + + // In the another thread is writing the block, wait for it to become ready. + if (!info.waitForReady()) { + // If we get here, the block write failed. + logWarning("Block " + blockId + " was marked as failure.") + return None + } + val level = info.level logDebug("Level for block " + blockId + " is " + level) @@ -362,7 +384,14 @@ class BlockManager( val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - info.waitForReady() // In case the block is still being put() by another thread + + // In the another thread is writing the block, wait for it to become ready. + if (!info.waitForReady()) { + // If we get here, the block write failed. + logWarning("Block " + blockId + " was marked as failure.") + return None + } + val level = info.level logDebug("Level for block " + blockId + " is " + level) @@ -423,12 +452,11 @@ class BlockManager( val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port)) if (data != null) { - logDebug("Data is not null: " + data) return Some(dataDeserialize(blockId, data)) } - logDebug("Data is null") + logDebug("The value of block " + blockId + " is null") } - logDebug("Data not found") + logDebug("Block " + blockId + " not found") return None } @@ -474,9 +502,8 @@ class BlockManager( } val oldBlock = blockInfo.get(blockId).orNull - if (oldBlock != null) { + if (oldBlock != null && oldBlock.waitForReady()) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - oldBlock.waitForReady() return oldBlock.size } @@ -504,31 +531,45 @@ class BlockManager( logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") - if (level.useMemory) { - // Save it just to memory first, even if it also has useDisk set to true; we will later - // drop it to disk if the memory store can't hold it. - val res = memoryStore.putValues(blockId, values, level, true) - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case Left(newIterator) => valuesAfterPut = newIterator - } - } else { - // Save directly to disk. - val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them - val res = diskStore.putValues(blockId, values, level, askForBytes) - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case _ => + try { + if (level.useMemory) { + // Save it just to memory first, even if it also has useDisk set to true; we will later + // drop it to disk if the memory store can't hold it. + val res = memoryStore.putValues(blockId, values, level, true) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case Left(newIterator) => valuesAfterPut = newIterator + } + } else { + // Save directly to disk. + // Don't get back the bytes unless we replicate them. + val askForBytes = level.replication > 1 + val res = diskStore.putValues(blockId, values, level, askForBytes) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => + } } - } - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. - myInfo.markReady(size) - if (tellMaster) { - reportBlockStatus(blockId, myInfo) + // Now that the block is in either the memory or disk store, let other threads read it, + // and tell the master about it. + myInfo.markReady(size) + if (tellMaster) { + reportBlockStatus(blockId, myInfo) + } + } catch { + // If we failed at putting the block to memory/disk, notify other possible readers + // that it has failed, and then remove it from the block info map. + case e: Exception => { + // Note that the remove must happen before markFailure otherwise another thread + // could've inserted a new BlockInfo before we remove it. + blockInfo.remove(blockId) + myInfo.markFailure() + logWarning("Putting block " + blockId + " failed", e) + throw e + } } } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) @@ -598,28 +639,38 @@ class BlockManager( logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") - if (level.useMemory) { - // Store it only in memory at first, even if useDisk is also set to true - bytes.rewind() - memoryStore.putBytes(blockId, bytes, level) - } else { - bytes.rewind() - diskStore.putBytes(blockId, bytes, level) - } + try { + if (level.useMemory) { + // Store it only in memory at first, even if useDisk is also set to true + bytes.rewind() + memoryStore.putBytes(blockId, bytes, level) + } else { + bytes.rewind() + diskStore.putBytes(blockId, bytes, level) + } - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. - myInfo.markReady(bytes.limit) - if (tellMaster) { - reportBlockStatus(blockId, myInfo) + // Now that the block is in either the memory or disk store, let other threads read it, + // and tell the master about it. + myInfo.markReady(bytes.limit) + if (tellMaster) { + reportBlockStatus(blockId, myInfo) + } + } catch { + // If we failed at putting the block to memory/disk, notify other possible readers + // that it has failed, and then remove it from the block info map. + case e: Exception => { + // Note that the remove must happen before markFailure otherwise another thread + // could've inserted a new BlockInfo before we remove it. + blockInfo.remove(blockId) + myInfo.markFailure() + logWarning("Putting block " + blockId + " failed", e) + throw e + } } } // If replication had started, then wait for it to finish if (level.replication > 1) { - if (replicationFuture == null) { - throw new Exception("Unexpected") - } Await.ready(replicationFuture, Duration.Inf) } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 2d177bbf6745891fb8995be18c865de7ca5bc85e..61e793b31f4d4426e188c10af78ec5c5386316bb 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -12,6 +12,7 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers._ import org.scalatest.time.SpanSugar._ +import spark.JavaSerializer import spark.KryoSerializer import spark.SizeEstimator import spark.util.ByteBufferInputStream @@ -262,7 +263,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT t1.join() t2.join() t3.join() - + store.dropFromMemory("a1", null) store.dropFromMemory("a2", null) store.waitForAsyncReregister() @@ -582,4 +583,21 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT System.clearProperty("spark.rdd.compress") } } + + test("block store put failure") { + // Use Java serializer so we can create an unserializable error. + store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200) + + // The put should fail since a1 is not serializable. + class UnserializableClass + val a1 = new UnserializableClass + intercept[java.io.NotSerializableException] { + store.putSingle("a1", a1, StorageLevel.DISK_ONLY) + } + + // Make sure get a1 doesn't hang and returns None. + failAfter(1 second) { + assert(store.getSingle("a1") == None, "a1 should not be in store") + } + } }