Skip to content
Snippets Groups Projects
Commit ea34c521 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #42 from pwendell/shuffle-read-perf

Fix inconsistent and incorrect log messages in shuffle read path

The user-facing messages generated by the CacheManager are currently wrong and somewhat misleading. This patch makes the messages more accurate. It also uses a consistent representation of the partition being fetched (`rdd_xx_yy`) so that it's easier for users to trace what is going on when reading logs.
parents 02f37ee8 8b377718
No related branches found
No related tags found
No related merge requests found
...@@ -34,22 +34,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { ...@@ -34,22 +34,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = { : Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index) val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key) logDebug("Looking for partition " + key)
blockManager.get(key) match { blockManager.get(key) match {
case Some(cachedValues) => case Some(values) =>
// Partition is in cache, so just return its values // Partition is already materialized, so just return its values
logInfo("Found partition in cache!") return values.asInstanceOf[Iterator[T]]
return cachedValues.asInstanceOf[Iterator[T]]
case None => case None =>
// Mark the split as loading (unless someone else marks it first) // Mark the split as loading (unless someone else marks it first)
loading.synchronized { loading.synchronized {
if (loading.contains(key)) { if (loading.contains(key)) {
logInfo("Loading contains " + key + ", waiting...") logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) { while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =>} try {loading.wait()} catch {case _ : Throwable =>}
} }
logInfo("Loading no longer contains " + key + ", so returning cached result") logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail // See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely // partition but we didn't want to make space for it. However, that case is unlikely
...@@ -59,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { ...@@ -59,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) => case Some(values) =>
return values.asInstanceOf[Iterator[T]] return values.asInstanceOf[Iterator[T]]
case None => case None =>
logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key) loading.add(key)
} }
} else { } else {
...@@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { ...@@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
} }
try { try {
// If we got here, we have to load the split // If we got here, we have to load the split
logInfo("Computing partition " + split) logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context) val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally // Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues } if (context.runningLocally) { return computedValues }
......
...@@ -523,7 +523,17 @@ private[spark] class BlockManager( ...@@ -523,7 +523,17 @@ private[spark] class BlockManager(
* Get a block from the block manager (either local or remote). * Get a block from the block manager (either local or remote).
*/ */
def get(blockId: String): Option[Iterator[Any]] = { def get(blockId: String): Option[Iterator[Any]] = {
getLocal(blockId).orElse(getRemote(blockId)) val local = getLocal(blockId)
if (local.isDefined) {
logInfo("Found block %s locally".format(blockId))
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo("Found block %s remotely".format(blockId))
return remote
}
None
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment