Skip to content
Snippets Groups Projects
Commit a967b005 authored by Aaron Davidson's avatar Aaron Davidson Committed by Patrick Wendell
Browse files

SPARK-1572 Don't kill Executor if PythonRDD fails while computing parent

Previously, the behavior was that if the parent RDD threw any exception other than IOException or FileNotFoundException (which is quite possible for Hadoop input sources), the entire Executor would crash, because the default thread a uncaught exception handler calls System.exit().

This patch avoids two related issues:

  1. Always catch exceptions in this reader thread.
  2. Don't mask readerException when Python throws an EOFError
     after worker.shutdownOutput() is called.

Author: Aaron Davidson <aaron@databricks.com>

Closes #486 from aarondav/pyspark and squashes the following commits:

fbb11e9 [Aaron Davidson] Make sure FileNotFoundExceptions are handled same as before
b9acb3e [Aaron Davidson] SPARK-1572 Don't kill Executor if PythonRDD fails while computing parent
parent a6646066
No related branches found
No related tags found
No related merge requests found
...@@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio ...@@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.Try
import net.razorvine.pickle.{Pickler, Unpickler} import net.razorvine.pickle.{Pickler, Unpickler}
...@@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag]( ...@@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.flush() dataOut.flush()
worker.shutdownOutput() worker.shutdownOutput()
} catch { } catch {
case e: java.io.FileNotFoundException => case e: java.io.FileNotFoundException =>
readerException = e readerException = e
// Kill the Python worker process: Try(worker.shutdownOutput()) // kill Python worker process
worker.shutdownOutput()
case e: IOException => case e: IOException =>
// This can happen for legitimate reasons if the Python code stops returning data // This can happen for legitimate reasons if the Python code stops returning data
// before we are done passing elements through, e.g., for take(). Just log a message // before we are done passing elements through, e.g., for take(). Just log a message to
// to say it happened. // say it happened (as it could also be hiding a real IOException from a data source).
logInfo("stdin writer to Python finished early") logInfo("stdin writer to Python finished early (may not be an error)", e)
logDebug("stdin writer to Python finished early", e)
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see Executor).
readerException = e
Try(worker.shutdownOutput()) // kill Python worker process
} }
} }
}.start() }.start()
...@@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag]( ...@@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag](
val exLength = stream.readInt() val exLength = stream.readInt()
val obj = new Array[Byte](exLength) val obj = new Array[Byte](exLength)
stream.readFully(obj) stream.readFully(obj)
throw new PythonException(new String(obj)) throw new PythonException(new String(obj), readerException)
case SpecialLengths.END_OF_DATA_SECTION => case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still // We've finished the data section of the output, but we can still
// read some accumulator updates: // read some accumulator updates:
...@@ -167,10 +174,13 @@ private[spark] class PythonRDD[T: ClassTag]( ...@@ -167,10 +174,13 @@ private[spark] class PythonRDD[T: ClassTag](
Array.empty[Byte] Array.empty[Byte]
} }
} catch { } catch {
case eof: EOFException => { case e: Exception if readerException != null =>
logError("Python worker exited unexpectedly (crashed)", e)
logError("Python crash may have been caused by prior exception:", readerException)
throw readerException
case eof: EOFException =>
throw new SparkException("Python worker exited unexpectedly (crashed)", eof) throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
}
case e: Throwable => throw e
} }
} }
...@@ -185,7 +195,7 @@ private[spark] class PythonRDD[T: ClassTag]( ...@@ -185,7 +195,7 @@ private[spark] class PythonRDD[T: ClassTag](
} }
/** Thrown for exceptions in user Python code. */ /** Thrown for exceptions in user Python code. */
private class PythonException(msg: String) extends Exception(msg) private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg, cause)
/** /**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment