diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 8139a2a40c66fc9a2f0f24adc09df30ed2df8c5f..78097502bca48d207a65563718079c638490d987 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
 
 
 /**
@@ -42,7 +42,7 @@ class HadoopRDD[K, V](
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
-  extends RDD[(K, V)](sc, Nil) {
+  extends RDD[(K, V)](sc, Nil) with Logging {
 
   // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -71,7 +71,7 @@ class HadoopRDD[K, V](
     reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback(() => reader.close())
+    context.addOnCompleteCallback{ () => close() }
 
     val key: K = reader.createKey()
     val value: V = reader.createValue()
@@ -88,9 +88,6 @@ class HadoopRDD[K, V](
         }
         gotNext = true
       }
-      if (finished) {
-        reader.close()
-      }
       !finished
     }
 
@@ -104,6 +101,14 @@ class HadoopRDD[K, V](
       gotNext = false
       (key, value)
     }
+
+    private def close() {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception => logWarning("Exception in RecordReader.close()", e)
+      }
+    }
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index ebd4c3f0e2d863ddfdf629b5666add02f182ee55..df2361025c75327837c4e13184e762261e4b7509 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
-import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
 
 
 private[spark]
@@ -26,7 +26,8 @@ class NewHadoopRDD[K, V](
     valueClass: Class[V],
     @transient conf: Configuration)
   extends RDD[(K, V)](sc, Nil)
-  with HadoopMapReduceUtil {
+  with HadoopMapReduceUtil
+  with Logging {
 
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@@ -61,7 +62,7 @@ class NewHadoopRDD[K, V](
     reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
     // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback(() => reader.close())
+    context.addOnCompleteCallback(() => close())
 
     var havePair = false
     var finished = false
@@ -81,6 +82,14 @@ class NewHadoopRDD[K, V](
       havePair = false
       return (reader.getCurrentKey, reader.getCurrentValue)
     }
+
+    private def close() {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception => logWarning("Exception in RecordReader.close()", e)
+      }
+    }
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {