Skip to content
Snippets Groups Projects
Commit 8b377718 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Responses to review

parent 391133f6
No related branches found
No related tags found
No related merge requests found
...@@ -34,7 +34,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { ...@@ -34,7 +34,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = { : Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index) val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Looking for partition " + key) logDebug("Looking for partition " + key)
blockManager.get(key) match { blockManager.get(key) match {
case Some(values) => case Some(values) =>
// Partition is already materialized, so just return its values // Partition is already materialized, so just return its values
...@@ -44,11 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { ...@@ -44,11 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// Mark the split as loading (unless someone else marks it first) // Mark the split as loading (unless someone else marks it first)
loading.synchronized { loading.synchronized {
if (loading.contains(key)) { if (loading.contains(key)) {
logInfo("Loading contains " + key + ", waiting...") logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) { while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =>} try {loading.wait()} catch {case _ : Throwable =>}
} }
logInfo("Loading no longer contains " + key + ", so returning cached result") logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail // See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely // partition but we didn't want to make space for it. However, that case is unlikely
...@@ -58,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { ...@@ -58,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) => case Some(values) =>
return values.asInstanceOf[Iterator[T]] return values.asInstanceOf[Iterator[T]]
case None => case None =>
logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key) loading.add(key)
} }
} else { } else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment