From d942d3907241d50b693a316785af56023ec218b4 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sat, 23 Feb 2013 11:19:07 -0800
Subject: [PATCH] Handle exceptions in RecordReader.close() better (suggested
 by Jim Donahue)

---
 core/src/main/scala/spark/rdd/HadoopRDD.scala   | 17 +++++++++++------
 .../src/main/scala/spark/rdd/NewHadoopRDD.scala | 15 ++++++++++++---
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 8139a2a40c..78097502bc 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
 
 
 /**
@@ -42,7 +42,7 @@ class HadoopRDD[K, V](
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
-  extends RDD[(K, V)](sc, Nil) {
+  extends RDD[(K, V)](sc, Nil) with Logging {
 
   // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -71,7 +71,7 @@ class HadoopRDD[K, V](
     reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback(() => reader.close())
+    context.addOnCompleteCallback{ () => close() }
 
     val key: K = reader.createKey()
     val value: V = reader.createValue()
@@ -88,9 +88,6 @@ class HadoopRDD[K, V](
         }
         gotNext = true
       }
-      if (finished) {
-        reader.close()
-      }
       !finished
     }
 
@@ -104,6 +101,14 @@ class HadoopRDD[K, V](
       gotNext = false
       (key, value)
     }
+
+    private def close() {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception => logWarning("Exception in RecordReader.close()", e)
+      }
+    }
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index ebd4c3f0e2..df2361025c 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
 
 
 private[spark]
@@ -26,7 +26,8 @@ class NewHadoopRDD[K, V](
     valueClass: Class[V],
     @transient conf: Configuration)
   extends RDD[(K, V)](sc, Nil)
-  with HadoopMapReduceUtil {
+  with HadoopMapReduceUtil
+  with Logging {
 
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -61,7 +62,7 @@ class NewHadoopRDD[K, V](
     reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
     // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback(() => reader.close())
+    context.addOnCompleteCallback(() => close())
 
     var havePair = false
     var finished = false
@@ -81,6 +82,14 @@ class NewHadoopRDD[K, V](
       havePair = false
       return (reader.getCurrentKey, reader.getCurrentValue)
     }
+
+    private def close() {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception => logWarning("Exception in RecordReader.close()", e)
+      }
+    }
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-- 
GitLab