diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3d36761cdabbbceb7980ee49321353627f730aac..048168c52bde0a35d68106a32f327f2a614871ca 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -34,12 +34,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
       : Iterator[T] = {
     val key = "rdd_%d_%d".format(rdd.id, split.index)
-    logInfo("Cache key is " + key)
+    logInfo("Looking for partition " + key)
     blockManager.get(key) match {
-      case Some(cachedValues) =>
-        // Partition is in cache, so just return its values
-        logInfo("Found partition in cache!")
-        return cachedValues.asInstanceOf[Iterator[T]]
+      case Some(values) =>
+        // Partition is already materialized, so just return its values
+        return values.asInstanceOf[Iterator[T]]
 
       case None =>
         // Mark the split as loading (unless someone else marks it first)
@@ -68,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         }
         try {
           // If we got here, we have to load the split
-          logInfo("Computing partition " + split)
+          logInfo("Partition %s not found, computing it".format(key))
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
           // Persist the result, so long as the task is not running locally
           if (context.runningLocally) { return computedValues }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 495a72db698e4baab406a3be358a7bc8f31cc0d3..37d0ddb17b1911216329648e40b9e35599ee67ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -523,7 +523,17 @@ private[spark] class BlockManager(
    * Get a block from the block manager (either local or remote).
    */
   def get(blockId: String): Option[Iterator[Any]] = {
-    getLocal(blockId).orElse(getRemote(blockId))
+    val local = getLocal(blockId)
+    if (local.isDefined) {
+      logInfo("Found block %s locally".format(blockId))
+      return local
+    }
+    val remote = getRemote(blockId)
+    if (remote.isDefined) {
+      logInfo("Found block %s remotely".format(blockId))
+      return remote
+    }
+    None
   }
 
   /**