diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala new file mode 100644 index 0000000000000000000000000000000000000000..dbe0bda61589ca6a2d8e138113e48e8acfdaa392 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.ConcurrentHashMap + +private[storage] trait BlockInfo { + def level: StorageLevel + def tellMaster: Boolean + // To save space, 'pending' and 'failed' are encoded as special sizes: + @volatile var size: Long = BlockInfo.BLOCK_PENDING + private def pending: Boolean = size == BlockInfo.BLOCK_PENDING + private def failed: Boolean = size == BlockInfo.BLOCK_FAILED + private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this) + + setInitThread() + + private def setInitThread() { + // Set current thread as init thread - waitForReady will not block this thread + // (in case there is non trivial initialization which ends up calling waitForReady as part of + // initialization itself) + BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread()) + } + + /** + * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). + * Return true if the block is available, false otherwise. + */ + def waitForReady(): Boolean = { + if (pending && initThread != Thread.currentThread()) { + synchronized { + while (pending) this.wait() + } + } + !failed + } + + /** Mark this BlockInfo as ready (i.e. block is finished writing) */ + def markReady(sizeInBytes: Long) { + require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes) + assert (pending) + size = sizeInBytes + BlockInfo.blockInfoInitThreads.remove(this) + synchronized { + this.notifyAll() + } + } + + /** Mark this BlockInfo as ready but failed */ + def markFailure() { + assert (pending) + size = BlockInfo.BLOCK_FAILED + BlockInfo.blockInfoInitThreads.remove(this) + synchronized { + this.notifyAll() + } + } +} + +private object BlockInfo { + // initThread is logically a BlockInfo field, but we store it here because + // it's only needed while this block is in the 'pending' state and we want + // to minimize BlockInfo's memory footprint. + private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] + + private val BLOCK_PENDING: Long = -1L + private val BLOCK_FAILED: Long = -2L +} + +// All shuffle blocks have the same `level` and `tellMaster` properties, +// so we can save space by not storing them in each instance: +private[storage] class ShuffleBlockInfo extends BlockInfo { + // These need to be defined using 'def' instead of 'val' in order for + // the compiler to eliminate the fields: + def level: StorageLevel = StorageLevel.DISK_ONLY + def tellMaster: Boolean = false +} + +private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) + extends BlockInfo { + // Intentionally left blank +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dbe573dc64301ccbabc77028b311f079e8da18d0..76d537f8e838a75ec9f6296e143e13011a46408d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -46,61 +46,6 @@ private[spark] class BlockManager( maxMemory: Long) extends Logging { - private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { - @volatile var pending: Boolean = true - @volatile var size: Long = -1L - @volatile var initThread: Thread = null - @volatile var failed = false - - setInitThread() - - private def setInitThread() { - // Set current thread as init thread - waitForReady will not block this thread - // (in case there is non trivial initialization which ends up calling waitForReady as part of - // initialization itself) - this.initThread = Thread.currentThread() - } - - /** - * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). - * Return true if the block is available, false otherwise. - */ - def waitForReady(): Boolean = { - if (initThread != Thread.currentThread() && pending) { - synchronized { - while (pending) this.wait() - } - } - !failed - } - - /** Mark this BlockInfo as ready (i.e. block is finished writing) */ - def markReady(sizeInBytes: Long) { - assert (pending) - size = sizeInBytes - initThread = null - failed = false - initThread = null - pending = false - synchronized { - this.notifyAll() - } - } - - /** Mark this BlockInfo as ready but failed */ - def markFailure() { - assert (pending) - size = 0 - initThread = null - failed = true - initThread = null - pending = false - synchronized { - this.notifyAll() - } - } - } - val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager( System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) @@ -526,7 +471,7 @@ private[spark] class BlockManager( if (shuffleBlockManager.consolidateShuffleFiles) { diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) } - val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) + val myInfo = new ShuffleBlockInfo() blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) }) @@ -560,7 +505,7 @@ private[spark] class BlockManager( // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val myInfo = { - val tinfo = new BlockInfo(level, tellMaster) + val tinfo = new BlockInfoImpl(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) diff --git a/docker/spark-test/README.md b/docker/spark-test/README.md index 7f4b5460b10269b0302c579d76ff5da549859bdb..ec0baf6e6d4196c511f044f4cbffe478338376e0 100644 --- a/docker/spark-test/README.md +++ b/docker/spark-test/README.md @@ -3,7 +3,7 @@ Spark Docker files usable for testing and development purposes. These images are intended to be run like so: docker run -v $SPARK_HOME:/opt/spark spark-test-master - docker run -v $SPARK_HOME:/opt/spark spark-test-worker <master_ip> + docker run -v $SPARK_HOME:/opt/spark spark-test-worker spark://<master_ip>:7077 Using this configuration, the containers will have their Spark directories mounted to your actual `SPARK_HOME`, allowing you to modify and recompile