diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 43c6749ddc193a7cf5c0cd3598a64cf6a3152969..a6322dc58dc78b1ad28201fdbce27bd8bfb2b0a3 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -72,7 +72,7 @@ class HadoopRDD[K, V]( reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback{ () => close() } + context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala index da76b5f6d0d107405791b0aaf51b674e16510b41..48b5018dddbdd1326a7e7ff7bda771bc44c76ce0 100644 --- a/core/src/main/scala/spark/util/NextIterator.scala +++ b/core/src/main/scala/spark/util/NextIterator.scala @@ -5,6 +5,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { private var gotNext = false private var nextValue: U = _ + private var closed = false protected var finished = false /** @@ -34,12 +35,25 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { */ protected def close() + /** + * Calls the subclass-defined close method, but only once. + * + * Usually calling `close` multiple times should be fine, but historically + * there have been issues with some InputFormats throwing exceptions. + */ + def closeIfNeeded() { + if (!closed) { + close() + closed = true + } + } + override def hasNext: Boolean = { if (!finished) { if (!gotNext) { nextValue = getNext() if (finished) { - close() + closeIfNeeded() } gotNext = true }