diff --git a/core/pom.xml b/core/pom.xml index e4c32eff0cd770f08077542c1a9a9f2dc2ffeffd..66f9fc4961b03d062ba8eb1ac6cdd3e7adbf6923 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -200,6 +200,53 @@ <artifactId>derby</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.tachyonproject</groupId> + <artifactId>tachyon</artifactId> + <version>0.4.1-thrift</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-jsp</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-webapp</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </exclusion> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + </exclusion> + <exclusion> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java index 9f13b39909481b50c010803f6c24c48ae90c83b0..840a1bd93bfbb609b5291e346db50399310587b1 100644 --- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java +++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java @@ -23,17 +23,18 @@ import org.apache.spark.storage.StorageLevel; * Expose some commonly useful storage level constants. */ public class StorageLevels { - public static final StorageLevel NONE = create(false, false, false, 1); - public static final StorageLevel DISK_ONLY = create(true, false, false, 1); - public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2); - public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1); - public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2); - public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1); - public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2); - public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1); - public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2); - public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1); - public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2); + public static final StorageLevel NONE = create(false, false, false, false, 1); + public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1); + public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2); + public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1); + public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2); + public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1); + public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2); + public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1); + public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2); + public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1); + public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2); + public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1); /** * Create a new StorageLevel object. @@ -42,7 +43,26 @@ public class StorageLevels { * @param deserialized saved as deserialized objects, if true * @param replication replication factor */ - public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) { - return StorageLevel.apply(useDisk, useMemory, deserialized, replication); + @Deprecated + public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, + int replication) { + return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication); + } + + /** + * Create a new StorageLevel object. + * @param useDisk saved to disk, if true + * @param useMemory saved to memory, if true + * @param useOffHeap saved to Tachyon, if true + * @param deserialized saved as deserialized objects, if true + * @param replication replication factor + */ + public static StorageLevel create( + boolean useDisk, + boolean useMemory, + boolean useOffHeap, + boolean deserialized, + int replication) { + return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication); } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 835cffe37a938fdf01ee9bf0fa96852e2efce901..fcf16ce1b278eab5d095b8b96f17b0cc7296b37f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -19,14 +19,13 @@ package org.apache.spark import java.io._ import java.net.URI -import java.util.{Properties, UUID} import java.util.concurrent.atomic.AtomicInteger - +import java.util.{Properties, UUID} +import java.util.UUID.randomUUID import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.{ClassTag, classTag} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} @@ -130,6 +129,11 @@ class SparkContext( val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + // Generate the random name for a temp folder in Tachyon + // Add a timestamp as the suffix here to make it more safe + val tachyonFolderName = "spark-" + randomUUID.toString() + conf.set("spark.tachyonStore.folderName", tachyonFolderName) + val isLocal = (master == "local" || master.startsWith("local[")) if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3486092a140fbe7e78d59aab692eb63cbdeb3b8d..16887d8892b315968c065fc551b6272af7846b62 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? - executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) + executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, + false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, + sparkHostPort, cores), name = "Executor") workerUrl.foreach{ url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 210f3dbeebaca6a56bd71d42d4652985a6f4b1e9..ceff3a067d72abffdaf53b16b36264818b5073af 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -41,6 +41,12 @@ object ExecutorExitCode { /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 + /** TachyonStore failed to initialize after many attempts. */ + val TACHYON_STORE_FAILED_TO_INITIALIZE = 54 + + /** TachyonStore failed to create a local temporary directory after many attempts. */ + val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55 + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -48,6 +54,9 @@ object ExecutorExitCode { case OOM => "OutOfMemoryError" case DISK_STORE_FAILED_TO_CREATE_DIR => "Failed to create local directory (bad spark.local.dir?)" + case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize." + case TACHYON_STORE_FAILED_TO_CREATE_DIR => + "TachyonStore failed to create a local temporary directory." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 71584b6eb102a4813a11e1f202025688bad62193..19138d9dde6978431b98336d0f98688dabd71cfa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,22 +19,20 @@ package org.apache.spark.storage import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} - import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random - import akka.actor.{ActorSystem, Cancellable, Props} import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import sun.nio.ch.DirectBuffer - import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ + sealed trait Values case class ByteBufferValues(buffer: ByteBuffer) extends Values @@ -59,6 +57,17 @@ private[spark] class BlockManager( private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore = new DiskStore(this, diskBlockManager) + var tachyonInitialized = false + private[storage] lazy val tachyonStore: TachyonStore = { + val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") + val appFolderName = conf.get("spark.tachyonStore.folderName") + val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}" + val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") + val tachyonBlockManager = new TachyonBlockManager( + shuffleBlockManager, tachyonStorePath, tachyonMaster) + tachyonInitialized = true + new TachyonStore(this, tachyonBlockManager) + } // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { @@ -248,8 +257,10 @@ private[spark] class BlockManager( if (info.tellMaster) { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) + val inTachyonSize = status.tachyonSize val onDiskSize = status.diskSize - master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) + master.updateBlockInfo( + blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize) } else true } @@ -259,22 +270,24 @@ private[spark] class BlockManager( * and the updated in-memory and on-disk sizes. */ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { - val (newLevel, inMemSize, onDiskSize) = info.synchronized { + val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) + val inTachyon = level.useOffHeap && tachyonStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false - val replication = if (inMem || onDisk) level.replication else 1 - val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication) + val replication = if (inMem || inTachyon || onDisk) level.replication else 1 + val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L + val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - (storageLevel, memSize, diskSize) + (storageLevel, memSize, diskSize, tachyonSize) } } - BlockStatus(newLevel, inMemSize, onDiskSize) + BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize) } /** @@ -354,6 +367,24 @@ private[spark] class BlockManager( logDebug("Block " + blockId + " not found in memory") } } + + // Look for the block in Tachyon + if (level.useOffHeap) { + logDebug("Getting block " + blockId + " from tachyon") + if (tachyonStore.contains(blockId)) { + tachyonStore.getBytes(blockId) match { + case Some(bytes) => { + if (!asValues) { + return Some(bytes) + } else { + return Some(dataDeserialize(blockId, bytes)) + } + } + case None => + logDebug("Block " + blockId + " not found in tachyon") + } + } + } // Look for block on disk, potentially storing it back into memory if required: if (level.useDisk) { @@ -620,6 +651,23 @@ private[spark] class BlockManager( } // Keep track of which blocks are dropped from memory res.droppedBlocks.foreach { block => updatedBlocks += block } + } else if (level.useOffHeap) { + // Save to Tachyon. + val res = data match { + case IteratorValues(iterator) => + tachyonStore.putValues(blockId, iterator, level, false) + case ArrayBufferValues(array) => + tachyonStore.putValues(blockId, array, level, false) + case ByteBufferValues(bytes) => { + bytes.rewind(); + tachyonStore.putBytes(blockId, bytes, level) + } + } + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => + } } else { // Save directly to disk. // Don't get back the bytes unless we replicate them. @@ -644,8 +692,8 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. + // Now that the block is in either the memory, tachyon, or disk store, + // let other threads read it, and tell the master about it. marked = true putBlockInfo.markReady(size) if (tellMaster) { @@ -707,7 +755,8 @@ private[spark] class BlockManager( */ var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { - val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) + val tLevel = StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } @@ -832,9 +881,10 @@ private[spark] class BlockManager( // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) - if (!removedFromMemory && !removedFromDisk) { + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { logWarning("Block " + blockId + " could not be removed as it was not found in either " + - "the disk or memory store") + "the disk, memory, or tachyon store") } blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { @@ -871,6 +921,9 @@ private[spark] class BlockManager( if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { + tachyonStore.remove(id) + } iterator.remove() logInfo("Dropped block " + id) } @@ -946,6 +999,9 @@ private[spark] class BlockManager( blockInfo.clear() memoryStore.clear() diskStore.clear() + if (tachyonInitialized) { + tachyonStore.clear() + } metadataCleaner.cancel() broadcastCleaner.cancel() logInfo("BlockManager stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ed6937851b836da6b08c2c653827527c83d221ee..4bc1b407ad10698c831bb610bff553ea3b036539 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): Boolean = { + diskSize: Long, + tachyonSize: Long): Boolean = { val res = askDriverWithReply[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) logInfo("Updated info of block " + blockId) res } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index ff2652b640272e5fe2c0ecccff91c0bff7bf4fc6..378f4cadc17d7a3478a08a5b6924502022e5f9d4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -73,10 +73,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus register(blockManagerId, maxMemSize, slaveActor) sender ! true - case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + case UpdateBlockInfo( + blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => // TODO: Ideally we want to handle all the message replies in receive instead of in the // individual private methods. - updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) + updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) @@ -246,7 +247,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + diskSize: Long, + tachyonSize: Long) { if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.executorId == "<driver>" && !isLocal) { @@ -265,7 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus return } - blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) + blockManagerInfo(blockManagerId).updateBlockInfo( + blockId, storageLevel, memSize, diskSize, tachyonSize) var locations: mutable.HashSet[BlockManagerId] = null if (blockLocations.containsKey(blockId)) { @@ -309,8 +312,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } - -private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) +private[spark] case class BlockStatus( + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + tachyonSize: Long) private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, @@ -336,7 +342,8 @@ private[spark] class BlockManagerInfo( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + diskSize: Long, + tachyonSize: Long) { updateLastSeenMs() @@ -350,23 +357,29 @@ private[spark] class BlockManagerInfo( } if (storageLevel.isValid) { - /* isValid means it is either stored in-memory or on-disk. + /* isValid means it is either stored in-memory, on-disk or on-Tachyon. * But the memSize here indicates the data size in or dropped from memory, + * tachyonSize here indicates the data size in or dropped from Tachyon, * and the diskSize here indicates the data size in or dropped to disk. * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ if (storageLevel.useMemory) { - _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0)) + _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0)) _remainingMem -= memSize logInfo("Added %s in memory on %s (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize)) + _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0)) logInfo("Added %s on disk on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } + if (storageLevel.useOffHeap) { + _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize)) + logInfo("Added %s on tachyon on %s (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize))) + } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. val blockStatus: BlockStatus = _blocks.get(blockId) @@ -381,6 +394,10 @@ private[spark] class BlockManagerInfo( logInfo("Removed %s on %s on disk (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) } + if (blockStatus.storageLevel.useOffHeap) { + logInfo("Removed %s on %s on tachyon (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize))) + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index bbb9529b5a0cab9a7fb7100a292e87050297b799..8a36b5cc42dfd55c8a73dd10d0b760524088d934 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -53,11 +53,12 @@ private[storage] object BlockManagerMessages { var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, - var diskSize: Long) + var diskSize: Long, + var tachyonSize: Long) extends ToBlockManagerMaster with Externalizable { - def this() = this(null, null, null, 0, 0) // For deserialization only + def this() = this(null, null, null, 0, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput) { blockManagerId.writeExternal(out) @@ -65,6 +66,7 @@ private[storage] object BlockManagerMessages { storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) + out.writeLong(tachyonSize) } override def readExternal(in: ObjectInput) { @@ -73,6 +75,7 @@ private[storage] object BlockManagerMessages { storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() + tachyonSize = in.readLong() } } @@ -81,13 +84,15 @@ private[storage] object BlockManagerMessages { blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): UpdateBlockInfo = { - new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + diskSize: Long, + tachyonSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize) } // For pattern-matching - def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, BlockId, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + def unapply(h: UpdateBlockInfo) + : Option[(BlockManagerId, BlockId, StorageLevel, Long, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize, h.tachyonSize)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4212a539dab4b06907c37ef7e4d2456433231bac..95e71de2d3f1de985035158d50d99ea0dc05ebda 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -21,8 +21,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} /** * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, - * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory - * in a serialized format, and whether to replicate the RDD partitions on multiple nodes. + * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to + * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on + * multiple nodes. * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). @@ -30,45 +31,58 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, + private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1) extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. private def this(flags: Int, replication: Int) { - this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) + this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) } - def this() = this(false, true, false) // For deserialization + def this() = this(false, true, false, false) // For deserialization def useDisk = useDisk_ def useMemory = useMemory_ + def useOffHeap = useOffHeap_ def deserialized = deserialized_ def replication = replication_ assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") + if (useOffHeap) { + require(useDisk == false, "Off-heap storage level does not support using disk") + require(useMemory == false, "Off-heap storage level does not support using heap memory") + require(deserialized == false, "Off-heap storage level does not support deserialized storage") + require(replication == 1, "Off-heap storage level does not support multiple replication") + } + override def clone(): StorageLevel = new StorageLevel( - this.useDisk, this.useMemory, this.deserialized, this.replication) + this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication) override def equals(other: Any): Boolean = other match { case s: StorageLevel => s.useDisk == useDisk && s.useMemory == useMemory && + s.useOffHeap == useOffHeap && s.deserialized == deserialized && s.replication == replication case _ => false } - def isValid = ((useMemory || useDisk) && (replication > 0)) + def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0)) def toInt: Int = { var ret = 0 if (useDisk_) { - ret |= 4 + ret |= 8 } if (useMemory_) { + ret |= 4 + } + if (useOffHeap_) { ret |= 2 } if (deserialized_) { @@ -84,8 +98,9 @@ class StorageLevel private( override def readExternal(in: ObjectInput) { val flags = in.readByte() - useDisk_ = (flags & 4) != 0 - useMemory_ = (flags & 2) != 0 + useDisk_ = (flags & 8) != 0 + useMemory_ = (flags & 4) != 0 + useOffHeap_ = (flags & 2) != 0 deserialized_ = (flags & 1) != 0 replication_ = in.readByte() } @@ -93,14 +108,15 @@ class StorageLevel private( @throws(classOf[IOException]) private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) - override def toString: String = - "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) + override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format( + useDisk, useMemory, useOffHeap, deserialized, replication) override def hashCode(): Int = toInt * 41 + replication def description : String = { var result = "" result += (if (useDisk) "Disk " else "") result += (if (useMemory) "Memory " else "") + result += (if (useOffHeap) "Tachyon " else "") result += (if (deserialized) "Deserialized " else "Serialized ") result += "%sx Replicated".format(replication) result @@ -113,22 +129,28 @@ class StorageLevel private( * new storage levels. */ object StorageLevel { - val NONE = new StorageLevel(false, false, false) - val DISK_ONLY = new StorageLevel(true, false, false) - val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) - val MEMORY_ONLY = new StorageLevel(false, true, true) - val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) - val MEMORY_ONLY_SER = new StorageLevel(false, true, false) - val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) - val MEMORY_AND_DISK = new StorageLevel(true, true, true) - val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) - val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) - val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) + val NONE = new StorageLevel(false, false, false, false) + val DISK_ONLY = new StorageLevel(true, false, false, false) + val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) + val MEMORY_ONLY = new StorageLevel(false, true, false, true) + val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) + val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) + val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) + val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) + val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) + val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) + val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) + val OFF_HEAP = new StorageLevel(false, false, true, false) + + /** Create a new StorageLevel object without setting useOffHeap */ + def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, + deserialized: Boolean, replication: Int) = getCachedStorageLevel( + new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)) /** Create a new StorageLevel object */ - def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, - replication: Int = 1): StorageLevel = - getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication)) + def apply(useDisk: Boolean, useMemory: Boolean, + deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel( + new StorageLevel(useDisk, useMemory, false, deserialized, replication)) /** Create a new StorageLevel object from its integer representation */ def apply(flags: Int, replication: Int): StorageLevel = diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 26565f56ad8580358126fb57ced2e6c4d219bd18..7a174959037be043fbdd10375ac116a7250d6559 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -44,7 +44,7 @@ private[spark] class StorageStatusListener extends SparkListener { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L) + storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 6153dfe0b7e1350438da529f2e2fb3a4835ca287..ff6e84cf9819a1d1a2174627215c42e7a272e6e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -48,17 +48,23 @@ class StorageStatus( } private[spark] -class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel) - extends Ordered[RDDInfo] { +class RDDInfo( + val id: Int, + val name: String, + val numPartitions: Int, + val storageLevel: StorageLevel) extends Ordered[RDDInfo] { var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L + var tachyonSize= 0L override def toString = { - ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " + - "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions, - numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize)) + import Utils.bytesToString + ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" + + "TachyonSize: %s; DiskSize: %s").format( + name, id, storageLevel.toString, numCachedPartitions, numPartitions, + bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize)) } override def compare(that: RDDInfo) = { @@ -105,14 +111,17 @@ object StorageUtils { val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => - // Add up memory and disk sizes - val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 } + // Add up memory, disk and Tachyon sizes + val persistedBlocks = + blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L) val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) + val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L) rddInfoMap.get(rddId).map { rddInfo => rddInfo.numCachedPartitions = persistedBlocks.length rddInfo.memSize = memSize rddInfo.diskSize = diskSize + rddInfo.tachyonSize = tachyonSize rddInfo } }.toArray diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala new file mode 100644 index 0000000000000000000000000000000000000000..b0b967485656860d31749b027eca10786b291a20 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.text.SimpleDateFormat +import java.util.{Date, Random} + +import tachyon.client.TachyonFS +import tachyon.client.TachyonFile + +import org.apache.spark.Logging +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.network.netty.ShuffleSender +import org.apache.spark.util.Utils + + +/** + * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By + * default, one block is mapped to one file with a name given by its BlockId. + * + * @param rootDirs The directories to use for storing block files. Data will be hashed among these. + */ +private[spark] class TachyonBlockManager( + shuffleManager: ShuffleBlockManager, + rootDirs: String, + val master: String) + extends Logging { + + val client = if (master != null && master != "") TachyonFS.get(master) else null + + if (client == null) { + logError("Failed to connect to the Tachyon as the master address is not configured") + System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE) + } + + private val MAX_DIR_CREATION_ATTEMPTS = 10 + private val subDirsPerTachyonDir = + shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt + + // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; + // then, inside this directory, create multiple subdirectories that we will hash files into, + // in order to avoid having really large inodes at the top level in Tachyon. + private val tachyonDirs: Array[TachyonFile] = createTachyonDirs() + private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) + + addShutdownHook() + + def removeFile(file: TachyonFile): Boolean = { + client.delete(file.getPath(), false) + } + + def fileExists(file: TachyonFile): Boolean = { + client.exist(file.getPath()) + } + + def getFile(filename: String): TachyonFile = { + // Figure out which tachyon directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % tachyonDirs.length + val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir + + // Create the subdirectory if it doesn't already exist + var subDir = subDirs(dirId)(subDirId) + if (subDir == null) { + subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) + client.mkdir(path) + val newDir = client.getFile(path) + subDirs(dirId)(subDirId) = newDir + newDir + } + } + } + val filePath = subDir + "/" + filename + if(!client.exist(filePath)) { + client.createFile(filePath) + } + val file = client.getFile(filePath) + file + } + + def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name) + + // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. + private def createTachyonDirs(): Array[TachyonFile] = { + logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") + val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") + rootDirs.split(",").map { rootDir => + var foundLocalDir = false + var tachyonDir: TachyonFile = null + var tachyonDirId: String = null + var tries = 0 + val rand = new Random() + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) + val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId + if (!client.exist(path)) { + foundLocalDir = client.mkdir(path) + tachyonDir = client.getFile(path) + } + } catch { + case e: Exception => + logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) + } + } + if (!foundLocalDir) { + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + + rootDir) + System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) + } + logInfo("Created tachyon directory at " + tachyonDir) + tachyonDir + } + } + + private def addShutdownHook() { + tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") { + override def run() { + logDebug("Shutdown hook called") + tachyonDirs.foreach { tachyonDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { + Utils.deleteRecursively(tachyonDir, client) + } + } catch { + case t: Throwable => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, t) + } + } + } + }) + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala new file mode 100644 index 0000000000000000000000000000000000000000..b86abbda1d3e74c63574e55afde80cbe4d93ee6b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import tachyon.client.TachyonFile + +/** + * References a particular segment of a file (potentially the entire file), based off an offset and + * a length. + */ +private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) { + override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length) +} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala new file mode 100644 index 0000000000000000000000000000000000000000..c37e76f89360580bba5d37b3843a52474d37f68b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.IOException +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer + +import tachyon.client.{WriteType, ReadType} + +import org.apache.spark.Logging +import org.apache.spark.util.Utils +import org.apache.spark.serializer.Serializer + + +private class Entry(val size: Long) + + +/** + * Stores BlockManager blocks on Tachyon. + */ +private class TachyonStore( + blockManager: BlockManager, + tachyonManager: TachyonBlockManager) + extends BlockStore(blockManager: BlockManager) with Logging { + + logInfo("TachyonStore started") + + override def getSize(blockId: BlockId): Long = { + tachyonManager.getFile(blockId.name).length + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { + putToTachyonStore(blockId, bytes, true) + } + + override def putValues( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + return putValues(blockId, values.toIterator, level, returnValues) + } + + override def putValues( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + logDebug("Attempting to write values for block " + blockId) + val _bytes = blockManager.dataSerialize(blockId, values) + putToTachyonStore(blockId, _bytes, returnValues) + } + + private def putToTachyonStore( + blockId: BlockId, + bytes: ByteBuffer, + returnValues: Boolean): PutResult = { + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val byteBuffer = bytes.duplicate() + byteBuffer.rewind() + logDebug("Attempting to put block " + blockId + " into Tachyon") + val startTime = System.currentTimeMillis + val file = tachyonManager.getFile(blockId) + val os = file.getOutStream(WriteType.TRY_CACHE) + os.write(byteBuffer.array()) + os.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file in Tachyon in %d ms".format( + blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime))) + + if (returnValues) { + PutResult(bytes.limit(), Right(bytes.duplicate())) + } else { + PutResult(bytes.limit(), null) + } + } + + override def remove(blockId: BlockId): Boolean = { + val file = tachyonManager.getFile(blockId) + if (tachyonManager.fileExists(file)) { + tachyonManager.removeFile(file) + } else { + false + } + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = tachyonManager.getFile(blockId) + if (file == null || file.getLocationHosts().size == 0) { + return None + } + val is = file.getInStream(ReadType.CACHE) + var buffer: ByteBuffer = null + try { + if (is != null) { + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) + buffer = ByteBuffer.wrap(bs) + if (fetchSize != size) { + logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size + + " is not equal to fetched size " + fetchSize) + return None + } + } + } catch { + case ioe: IOException => { + logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe) + return None + } + } + Some(buffer) + } + + override def contains(blockId: BlockId): Boolean = { + val file = tachyonManager.getFile(blockId) + tachyonManager.fileExists(file) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index b2732de51058a414bbaaa9b64fc87b7b38d9d1ec..0fa461e5e9d27d63134aadf53639f942526b473c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { + val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage) @@ -45,6 +46,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { "Cached Partitions", "Fraction Cached", "Size in Memory", + "Size in Tachyon", "Size on Disk") /** Render an HTML row representing an RDD */ @@ -60,6 +62,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { <td>{rdd.numCachedPartitions}</td> <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td> <td>{Utils.bytesToString(rdd.memSize)}</td> + <td>{Utils.bytesToString(rdd.tachyonSize)}</td> <td>{Utils.bytesToString(rdd.diskSize)}</td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d9a6af61872d1e3bbb36f13b524cbdeabb48c03c..2155a8888c85ce9ce689197d75137d387f2cf3da 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -274,12 +274,14 @@ private[spark] object JsonProtocol { ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ + ("Tachyon Size" -> rddInfo.tachyonSize) ~ ("Disk Size" -> rddInfo.diskSize) } def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ + ("Use Tachyon" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -288,6 +290,7 @@ private[spark] object JsonProtocol { val storageLevel = storageLevelToJson(blockStatus.storageLevel) ("Storage Level" -> storageLevel) ~ ("Memory Size" -> blockStatus.memSize) ~ + ("Tachyon Size" -> blockStatus.tachyonSize) ~ ("Disk Size" -> blockStatus.diskSize) } @@ -570,11 +573,13 @@ private[spark] object JsonProtocol { val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] + val tachyonSize = (json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize + rddInfo.tachyonSize = tachyonSize rddInfo.diskSize = diskSize rddInfo } @@ -582,16 +587,18 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] + val useTachyon = (json \ "Use Tachyon").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, deserialized, replication) + StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication) } def blockStatusFromJson(json: JValue): BlockStatus = { val storageLevel = storageLevelFromJson(json \ "Storage Level") val memorySize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - BlockStatus(storageLevel, memorySize, diskSize) + val tachyonSize = (json \ "Tachyon Size").extract[Long] + BlockStatus(storageLevel, memorySize, diskSize, tachyonSize) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 737b765e2aed6a363a5f66d94aad058a3cdb57d8..d3c39dee330b281bcae378a3a89dad6b46e21036 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,11 +34,13 @@ import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ +import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} + /** * Various utility methods used by Spark. */ @@ -153,6 +155,7 @@ private[spark] object Utils extends Logging { } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() + private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { @@ -162,6 +165,14 @@ private[spark] object Utils extends Logging { } } + // Register the tachyon path to be deleted via shutdown hook + def registerShutdownDeleteDir(tachyonfile: TachyonFile) { + val absolutePath = tachyonfile.getPath() + shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths += absolutePath + } + } + // Is the path already registered to be deleted via a shutdown hook ? def hasShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() @@ -170,6 +181,14 @@ private[spark] object Utils extends Logging { } } + // Is the path already registered to be deleted via a shutdown hook ? + def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { + val absolutePath = file.getPath() + shutdownDeletePaths.synchronized { + shutdownDeletePaths.contains(absolutePath) + } + } + // Note: if file is child of some registered path, while not equal to it, then return true; // else false. This is to ensure that two shutdown hooks do not try to delete each others // paths - resulting in IOException and incomplete cleanup. @@ -186,6 +205,22 @@ private[spark] object Utils extends Logging { retval } + // Note: if file is child of some registered path, while not equal to it, then return true; + // else false. This is to ensure that two shutdown hooks do not try to delete each others + // paths - resulting in Exception and incomplete cleanup. + def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = { + val absolutePath = file.getPath() + val retval = shutdownDeletePaths.synchronized { + shutdownDeletePaths.find { path => + !absolutePath.equals(path) && absolutePath.startsWith(path) + }.isDefined + } + if (retval) { + logInfo("path = " + file + ", already present as root for deletion.") + } + retval + } + /** Create a temporary directory inside the given parent directory */ def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { var attempts = 0 @@ -541,7 +576,16 @@ private[spark] object Utils extends Logging { } /** - * Check to see if file is a symbolic link. + * Delete a file or directory and its contents recursively. + */ + def deleteRecursively(dir: TachyonFile, client: TachyonFS) { + if (!client.delete(dir.getPath(), true)) { + throw new IOException("Failed to delete the tachyon dir: " + dir) + } + } + + /** + * Check to see if file is a symbolic link. */ def isSymlink(file: File): Boolean = { if (file == null) throw new NullPointerException("File must not be null") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e83cd55e73691a44bbaa1b02a37876081cabe775..b6dd0526105a0e85f92251453f25ab2a8b0feb06 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -96,9 +96,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("StorageLevel object caching") { - val level1 = StorageLevel(false, false, false, 3) - val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1 - val level3 = StorageLevel(false, false, false, 2) // this should return a different object + val level1 = StorageLevel(false, false, false, false, 3) + val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1 + val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object assert(level2 === level1, "level2 is not same as level1") assert(level2.eq(level1), "level2 is not the same object as level1") assert(level3 != level1, "level3 is same as level1") @@ -410,6 +410,25 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") } + test("tachyon storage") { + // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar. + val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false) + if (tachyonUnitTestEnabled) { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.OFF_HEAP) + store.putSingle("a2", a2, StorageLevel.OFF_HEAP) + store.putSingle("a3", a3, StorageLevel.OFF_HEAP) + assert(store.getSingle("a3").isDefined, "a3 was in store") + assert(store.getSingle("a2").isDefined, "a2 was in store") + assert(store.getSingle("a1").isDefined, "a1 was in store") + } else { + info("tachyon storage test disabled.") + } + } + test("on-disk storage") { store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 40c29014c4b59e042d1a6e12ffa770dfbc57061b..054eb01a64c11dcaa7a397ad3443ff40e97c9ab5 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -456,7 +456,7 @@ class JsonProtocolSuite extends FunSuite { t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => - (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i)) + (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i)) }.toSeq) t } @@ -470,19 +470,19 @@ class JsonProtocolSuite extends FunSuite { """ {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": "greetings","Number of Tasks":200,"RDD Info":{"RDD ID":100,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1}, - "Number of Partitions":200,"Number of Cached Partitions":300,"Memory Size":400, - "Disk Size":500},"Emitted Task Size Warning":false},"Properties":{"France":"Paris", - "Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, + "Replication":1},"Number of Partitions":200,"Number of Cached Partitions":300, + "Memory Size":400,"Disk Size":500,"Tachyon Size":0},"Emitted Task Size Warning":false}, + "Properties":{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = """ {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name": "greetings","Number of Tasks":201,"RDD Info":{"RDD ID":101,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1}, - "Number of Partitions":201,"Number of Cached Partitions":301,"Memory Size":401, - "Disk Size":501},"Emitted Task Size Warning":false}} + Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, + "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, + "Memory Size":401,"Disk Size":501,"Tachyon Size":0},"Emitted Task Size Warning":false}} """ private val taskStartJsonString = @@ -515,8 +515,8 @@ class JsonProtocolSuite extends FunSuite { 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics": {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks": [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status": - {"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":false, - "Replication":2},"Memory Size":0,"Disk Size":0}}]}} + {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} """ private val jobStartJsonString = diff --git a/docs/configuration.md b/docs/configuration.md index 1ff01505672558c50d37607caad4d57696b44785..b6005acac8b93fcee0f6593ef526f07ee01e3dff 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -122,6 +122,21 @@ Apart from these, the following properties are also available, and may be useful <code>spark.storage.memoryFraction</code>. </td> </tr> +<tr> + <td>spark.tachyonStore.baseDir</td> + <td>System.getProperty("java.io.tmpdir")</td> + <td> + Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by <code>spark.tachyonStore.url</code>. + It can also be a comma-separated list of multiple directories on Tachyon file system. + </td> +</tr> +<tr> + <td>spark.tachyonStore.url</td> + <td>tachyon://localhost:19998</td> + <td> + The URL of the underlying Tachyon file system in the TachyonStore. + </td> +</tr> <tr> <td>spark.mesos.coarse</td> <td>false</td> @@ -161,13 +176,13 @@ Apart from these, the following properties are also available, and may be useful <td>spark.ui.acls.enable</td> <td>false</td> <td> - Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has + Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has access permissions to view the web ui. See <code>spark.ui.view.acls</code> for more details. Also note this requires the user to be known, if the user comes across as null no checks are done. Filters can be used to authenticate and set the user. </td> </tr> -<tr> +<tr> <td>spark.ui.view.acls</td> <td>Empty</td> <td> @@ -276,10 +291,10 @@ Apart from these, the following properties are also available, and may be useful <td>spark.serializer.objectStreamReset</td> <td>10000</td> <td> - When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches - objects to prevent writing redundant data, however that stops garbage collection of those - objects. By calling 'reset' you flush that info from the serializer, and allow old - objects to be collected. To turn off this periodic reset set it to a value of <= 0. + When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches + objects to prevent writing redundant data, however that stops garbage collection of those + objects. By calling 'reset' you flush that info from the serializer, and allow old + objects to be collected. To turn off this periodic reset set it to a value of <= 0. By default it will reset the serializer every 10,000 objects. </td> </tr> @@ -375,7 +390,7 @@ Apart from these, the following properties are also available, and may be useful <td>spark.akka.heartbeat.interval</td> <td>1000</td> <td> - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. </td> </tr> <tr> @@ -430,7 +445,7 @@ Apart from these, the following properties are also available, and may be useful <td>spark.broadcast.blockSize</td> <td>4096</td> <td> - Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>. + Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. </td> </tr> @@ -555,7 +570,7 @@ Apart from these, the following properties are also available, and may be useful the driver. </td> </tr> -<tr> +<tr> <td>spark.authenticate</td> <td>false</td> <td> @@ -563,7 +578,7 @@ Apart from these, the following properties are also available, and may be useful running on Yarn. </td> </tr> -<tr> +<tr> <td>spark.authenticate.secret</td> <td>None</td> <td> @@ -571,12 +586,12 @@ Apart from these, the following properties are also available, and may be useful not running on Yarn and authentication is enabled. </td> </tr> -<tr> +<tr> <td>spark.core.connection.auth.wait.timeout</td> <td>30</td> <td> Number of seconds for the connection to wait for authentication to occur before timing - out and giving up. + out and giving up. </td> </tr> <tr> diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index 99412733d426890825dfb9ccca788955135f1c03..77373890eead79a4199ddf37ec73f41d56a5232f 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -23,7 +23,7 @@ To write a Spark application, you need to add a dependency on Spark. If you use groupId = org.apache.spark artifactId = spark-core_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION}} + version = {{site.SPARK_VERSION}} In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS: @@ -73,14 +73,14 @@ The master URL passed to Spark can be in one of the following formats: <table class="table"> <tr><th>Master URL</th><th>Meaning</th></tr> <tr><td> local </td><td> Run Spark locally with one worker thread (i.e. no parallelism at all). </td></tr> -<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). +<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). </td></tr> -<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone - cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default. +<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone + cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default. </td></tr> -<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster. - The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, - which is 5050 by default. +<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster. + The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, + which is 5050 by default. </td></tr> </table> @@ -265,11 +265,25 @@ A complete list of actions is available in the [RDD API doc](api/core/index.html ## RDD Persistence -One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter. - -You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. - -In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing a [`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) object to `persist()`. The `cache()` method is a shorthand for using the default storage level, which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of available storage levels is: +One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory +across operations. When you persist an RDD, each node stores any slices of it that it computes in +memory and reuses them in other actions on that dataset (or datasets derived from it). This allows +future actions to be much faster (often by more than 10x). Caching is a key tool for building +iterative algorithms with Spark and for interactive use from the interpreter. + +You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time +it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- +if any partition of an RDD is lost, it will automatically be recomputed using the transformations +that originally created it. + +In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to +persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), +or replicate it across nodes, or store the data in off-heap memory in [Tachyon](http://tachyon-project.org/). +These levels are chosen by passing a +[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) +object to `persist()`. The `cache()` method is a shorthand for using the default storage level, +which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of +available storage levels is: <table class="table"> <tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr> @@ -292,8 +306,16 @@ In addition, each RDD can be stored using a different *storage level*, allowing </tr> <tr> <td> MEMORY_AND_DISK_SER </td> - <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them - on the fly each time they're needed. </td> + <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of + recomputing them on the fly each time they're needed. </td> +</tr> +<tr> + <td> OFF_HEAP </td> + <td> Store RDD in a <i>serialized</i> format in Tachyon. + This is generally more space-efficient than deserialized objects, especially when using a + <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read. + This also significantly reduces the overheads of GC. + </td> </tr> <tr> <td> DISK_ONLY </td> @@ -307,30 +329,59 @@ In addition, each RDD can be stored using a different *storage level*, allowing ### Which Storage Level to Choose? -Spark's storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency. -We recommend going through the following process to select one: - -* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. This is the most - CPU-efficient option, allowing operations on the RDDs to run as fast as possible. -* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to make the objects - much more space-efficient, but still reasonably fast to access. -* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter a large - amount of the data. Otherwise, recomputing a partition is about as fast as reading it from disk. -* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web - application). *All* the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones - let you continue running tasks on the RDD without waiting to recompute a lost partition. - -If you want to define your own storage level (say, with replication factor of 3 instead of 2), then use the function factor method `apply()` of the [`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object. +Spark's storage levels are meant to provide different trade-offs between memory usage and CPU +efficiency. It allows uses to choose memory, disk, or Tachyon for storing data. We recommend going +through the following process to select one: + +* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. + This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. + +* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to +make the objects much more space-efficient, but still reasonably fast to access. You can also use +`OFF_HEAP` mode to store the data off the heap in [Tachyon](http://tachyon-project.org/). This will +significantly reduce JVM GC overhead. + +* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter +a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from +disk. + +* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve +requests from a web application). *All* the storage levels provide full fault tolerance by +recomputing lost data, but the replicated ones let you continue running tasks on the RDD without +waiting to recompute a lost partition. + +If you want to define your own storage level (say, with replication factor of 3 instead of 2), then +use the function factor method `apply()` of the +[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object. + +Spark has a block manager inside the Executors that let you chose memory, disk, or off-heap. The +latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system +[Tachyon](http://tachyon-project.org/). This mode has the following advantages: + +* Cached data will not be lost if individual executors crash. +* Executors can have a smaller memory footprint, allowing you to run more executors on the same +machine as the bulk of the memory will be inside Tachyon. +* Reduced GC overhead since data is stored in Tachyon. # Shared Variables -Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators. +Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a +remote cluster node, it works on separate copies of all the variables used in the function. These +variables are copied to each machine, and no updates to the variables on the remote machine are +propagated back to the driver program. Supporting general, read-write shared variables across tasks +would be inefficient. However, Spark does provide two limited types of *shared variables* for two +common usage patterns: broadcast variables and accumulators. ## Broadcast Variables -Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. +Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather +than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a +large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables +using efficient broadcast algorithms to reduce communication cost. -Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. The interpreter session below shows this: +Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The +broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` +method. The interpreter session below shows this: {% highlight scala %} scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) @@ -340,13 +391,21 @@ scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) {% endhighlight %} -After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). +After the broadcast variable is created, it should be used instead of the value `v` in any functions +run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object +`v` should not be modified after it is broadcast in order to ensure that all nodes get the same +value of the broadcast variable (e.g. if the variable is shipped to a new node later). ## Accumulators -Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types. +Accumulators are variables that are only "added" to through an associative operation and can +therefore be efficiently supported in parallel. They can be used to implement counters (as in +MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable +collections, and programmers can add support for new types. -An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method. +An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks +running on the cluster can then add to it using the `+=` operator. However, they cannot read its +value. Only the driver program can read the accumulator's value, using its `value` method. The interpreter session below shows an accumulator being used to add up the elements of an array: diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index e5a09ecec006f2727b1611505547fd5665f26c30..d3babc3ed12c80ab87ce9d671104fd8ae8ea4e2c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -18,8 +18,8 @@ package org.apache.spark.examples import scala.math.random + import org.apache.spark._ -import SparkContext._ /** Computes an approximation to pi */ object SparkPi { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala new file mode 100644 index 0000000000000000000000000000000000000000..53b303d658386531a87e2872e432ab21d07cc5cd --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import java.util.Random +import scala.math.exp +import org.apache.spark.util.Vector +import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.InputFormatInfo +import org.apache.spark.storage.StorageLevel + +/** + * Logistic regression based classification. + * This example uses Tachyon to persist rdds during computation. + */ +object SparkTachyonHdfsLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + val tok = new java.util.StringTokenizer(line, " ") + var y = tok.nextToken.toDouble + var x = new Array[Double](D) + var i = 0 + while (i < D) { + x(i) = tok.nextToken.toDouble; i += 1 + } + DataPoint(new Vector(x), y) + } + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>") + System.exit(1) + } + val inputPath = args(1) + val conf = SparkHadoopUtil.get.newConfiguration() + val sc = new SparkContext(args(0), "SparkTachyonHdfsLR", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), + InputFormatInfo.computePreferredLocations( + Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) + )) + val lines = sc.textFile(inputPath) + val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP) + val ITERATIONS = args(2).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = points.map { p => + (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x + }.reduce(_ + _) + w -= gradient + } + + println("Final w: " + w) + System.exit(0) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala new file mode 100644 index 0000000000000000000000000000000000000000..ce78f0876ed7c56cd71cd17cb3f343d2cbf100fb --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import scala.math.random + +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel + +/** + * Computes an approximation to pi + * This example uses Tachyon to persist rdds during computation. + */ +object SparkTachyonPi { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkTachyonPi <master> [<slices>]") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkTachyonPi", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) + + val slices = if (args.length > 1) args(1).toInt else 2 + val n = 100000 * slices + + val rdd = spark.parallelize(1 to n, slices) + rdd.persist(StorageLevel.OFF_HEAP) + val count = rdd.map { i => + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x * x + y * y < 1) 1 else 0 + }.reduce(_ + _) + println("Pi is roughly " + 4.0 * count / n) + + spark.stop() + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c5c697e8e24270a426931406d4c211b538fd1e44..843a874fbfdb066e1769254bb8f6baeecc1bc612 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -30,7 +30,7 @@ import scala.collection.JavaConversions._ // import com.jsuereth.pgp.sbtplugin.PgpKeys._ object SparkBuild extends Build { - val SPARK_VERSION = "1.0.0-SNAPSHOT" + val SPARK_VERSION = "1.0.0-SNAPSHOT" // Hadoop version to build against. For example, "1.0.4" for Apache releases, or // "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set @@ -185,15 +185,14 @@ object SparkBuild extends Build { concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), resolvers ++= Seq( - // HTTPS is unavailable for Maven Central "Maven Repository" at "http://repo.maven.apache.org/maven2", "Apache Repository" at "https://repository.apache.org/content/repositories/releases", "JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/", "MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/", - "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", + "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/", // For Sonatype publishing - //"sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - //"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/", + // "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", + // "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/", // also check the local Maven repository ~/.m2 Resolver.mavenLocal ), @@ -280,13 +279,18 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.5" val excludeNetty = ExclusionRule(organization = "org.jboss.netty") + val excludeEclipseJetty = ExclusionRule(organization = "org.eclipse.jetty") val excludeAsm = ExclusionRule(organization = "org.ow2.asm") val excludeOldAsm = ExclusionRule(organization = "asm") val excludeCommonsLogging = ExclusionRule(organization = "commons-logging") val excludeSLF4J = ExclusionRule(organization = "org.slf4j") val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact = "scalap") + val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop") + val excludeCurator = ExclusionRule(organization = "org.apache.curator") + val excludePowermock = ExclusionRule(organization = "org.powermock") - def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark", + + def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark", version: String = "0.9.0-incubating", crossVersion: String = "2.10"): Option[sbt.ModuleID] = { val fullId = if (crossVersion.isEmpty) id else id + "_" + crossVersion Some(organization % fullId % version) // the artifact to compare binary compatibility with @@ -323,6 +327,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm), "com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm), + "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), "com.clearspring.analytics" % "stream" % "2.5.1" ), libraryDependencies ++= maybeAvro diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ff1023bbfa539d69f98c04aadb7fe8d63a091948..d8667e84fedff3dff95ea20bd8399d8fecc0ad66 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -423,8 +423,11 @@ class SparkContext(object): raise Exception("storageLevel must be of type pyspark.StorageLevel") newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel - return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory, - storageLevel.deserialized, storageLevel.replication) + return newStorageLevel(storageLevel.useDisk, + storageLevel.useMemory, + storageLevel.useOffHeap, + storageLevel.deserialized, + storageLevel.replication) def setJobGroup(self, groupId, description): """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9943296b927dc66393d953dbd2334a92148d3764..fb27863e07f553fe20f74741fdd0d571cce898ee 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1302,11 +1302,12 @@ class RDD(object): Get the RDD's current storage level. >>> rdd1 = sc.parallelize([1,2]) >>> rdd1.getStorageLevel() - StorageLevel(False, False, False, 1) + StorageLevel(False, False, False, False, 1) """ java_storage_level = self._jrdd.getStorageLevel() storage_level = StorageLevel(java_storage_level.useDisk(), java_storage_level.useMemory(), + java_storage_level.useOffHeap(), java_storage_level.deserialized(), java_storage_level.replication()) return storage_level diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index c3e3a44e8e7abfc3692b6815ff12b874b832382d..7b6660eab231bb00ff9b7b9715aa7c51c1a67ccf 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -25,23 +25,25 @@ class StorageLevel: Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY. """ - def __init__(self, useDisk, useMemory, deserialized, replication = 1): + def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication = 1): self.useDisk = useDisk self.useMemory = useMemory + self.useOffHeap = useOffHeap self.deserialized = deserialized self.replication = replication def __repr__(self): - return "StorageLevel(%s, %s, %s, %s)" % ( - self.useDisk, self.useMemory, self.deserialized, self.replication) + return "StorageLevel(%s, %s, %s, %s, %s)" % ( + self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication) -StorageLevel.DISK_ONLY = StorageLevel(True, False, False) -StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2) -StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True) -StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2) -StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False) -StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2) -StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True) -StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2) -StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False) -StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2) +StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) +StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) +StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True) +StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2) +StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False) +StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2) +StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True) +StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2) +StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False) +StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2) +StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1) \ No newline at end of file