diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f9d86fed34d0f558a75dd2686189b276eda2bab5..8a843fbb0e66fb7a9128461b1d4fdec795b23a51 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import scala.util.Try import net.razorvine.pickle.{Pickler, Unpickler} @@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.flush() worker.shutdownOutput() } catch { + case e: java.io.FileNotFoundException => readerException = e - // Kill the Python worker process: - worker.shutdownOutput() + Try(worker.shutdownOutput()) // kill Python worker process + case e: IOException => // This can happen for legitimate reasons if the Python code stops returning data - // before we are done passing elements through, e.g., for take(). Just log a message - // to say it happened. - logInfo("stdin writer to Python finished early") - logDebug("stdin writer to Python finished early", e) + // before we are done passing elements through, e.g., for take(). Just log a message to + // say it happened (as it could also be hiding a real IOException from a data source). + logInfo("stdin writer to Python finished early (may not be an error)", e) + + case e: Exception => + // We must avoid throwing exceptions here, because the thread uncaught exception handler + // will kill the whole executor (see Executor). + readerException = e + Try(worker.shutdownOutput()) // kill Python worker process } } }.start() @@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag]( val exLength = stream.readInt() val obj = new Array[Byte](exLength) stream.readFully(obj) - throw new PythonException(new String(obj)) + throw new PythonException(new String(obj), readerException) case SpecialLengths.END_OF_DATA_SECTION => // We've finished the data section of the output, but we can still // read some accumulator updates: @@ -167,10 +174,13 @@ private[spark] class PythonRDD[T: ClassTag]( Array.empty[Byte] } } catch { - case eof: EOFException => { + case e: Exception if readerException != null => + logError("Python worker exited unexpectedly (crashed)", e) + logError("Python crash may have been caused by prior exception:", readerException) + throw readerException + + case eof: EOFException => throw new SparkException("Python worker exited unexpectedly (crashed)", eof) - } - case e: Throwable => throw e } } @@ -185,7 +195,7 @@ private[spark] class PythonRDD[T: ClassTag]( } /** Thrown for exceptions in user Python code. */ -private class PythonException(msg: String) extends Exception(msg) +private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg, cause) /** * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.