From 1a175d13b967b0080cfa4b5d8d1c278e0e61565a Mon Sep 17 00:00:00 2001
From: Stephen Haberman <stephen@exigencecorp.com>
Date: Wed, 13 Mar 2013 10:17:39 -0500
Subject: [PATCH] Add NextIterator.closeIfNeeded.

---
 core/src/main/scala/spark/rdd/HadoopRDD.scala    |  2 +-
 .../src/main/scala/spark/util/NextIterator.scala | 16 +++++++++++++++-
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 43c6749ddc..a6322dc58d 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -72,7 +72,7 @@ class HadoopRDD[K, V](
     reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback{ () => close() }
+    context.addOnCompleteCallback{ () => closeIfNeeded() }
 
     val key: K = reader.createKey()
     val value: V = reader.createValue()
diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala
index da76b5f6d0..48b5018ddd 100644
--- a/core/src/main/scala/spark/util/NextIterator.scala
+++ b/core/src/main/scala/spark/util/NextIterator.scala
@@ -5,6 +5,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
   
   private var gotNext = false
   private var nextValue: U = _
+  private var closed = false
   protected var finished = false
 
   /**
@@ -34,12 +35,25 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
    */
   protected def close()
 
+  /**
+   * Calls the subclass-defined close method, but only once.
+   *
+   * Usually calling `close` multiple times should be fine, but historically
+   * there have been issues with some InputFormats throwing exceptions.
+   */
+  def closeIfNeeded() {
+    if (!closed) {
+      close()
+      closed = true
+    }
+  }
+
   override def hasNext: Boolean = {
     if (!finished) {
       if (!gotNext) {
         nextValue = getNext()
         if (finished) {
-          close()
+          closeIfNeeded()
         }
         gotNext = true
       }
-- 
GitLab