From 391133f66a41cf78cc200c20c0228eb99eebc6fd Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 7 Oct 2013 17:08:06 -0700
Subject: [PATCH] Fix inconsistent and incorrect log messages in shuffle read
 path

---
 .../main/scala/org/apache/spark/CacheManager.scala   | 11 +++++------
 .../org/apache/spark/storage/BlockManager.scala      | 12 +++++++++++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3d36761cda..048168c52b 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -34,12 +34,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
       : Iterator[T] = {
     val key = "rdd_%d_%d".format(rdd.id, split.index)
-    logInfo("Cache key is " + key)
+    logInfo("Looking for partition " + key)
     blockManager.get(key) match {
-      case Some(cachedValues) =>
-        // Partition is in cache, so just return its values
-        logInfo("Found partition in cache!")
-        return cachedValues.asInstanceOf[Iterator[T]]
+      case Some(values) =>
+        // Partition is already materialized, so just return its values
+        return values.asInstanceOf[Iterator[T]]
 
       case None =>
         // Mark the split as loading (unless someone else marks it first)
@@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         }
         try {
           // If we got here, we have to load the split
-          logInfo("Computing partition " + split)
+          logInfo("Partition %s not found, computing it".format(key))
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
           // Persist the result, so long as the task is not running locally
           if (context.runningLocally) { return computedValues }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 495a72db69..37d0ddb17b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -523,7 +523,17 @@ private[spark] class BlockManager(
    * Get a block from the block manager (either local or remote).
    */
   def get(blockId: String): Option[Iterator[Any]] = {
-    getLocal(blockId).orElse(getRemote(blockId))
+    val local = getLocal(blockId)
+    if (local.isDefined) {
+      logInfo("Found block %s locally".format(blockId))
+      return local
+    }
+    val remote = getRemote(blockId)
+    if (remote.isDefined) {
+      logInfo("Found block %s remotely".format(blockId))
+      return remote
+    }
+    None
   }
 
   /**
-- 
GitLab