diff --git a/bin/pyspark b/bin/pyspark index 59cfdfa7c5d1a8a89b8081e3c2a1153aa104c714..f553b314c5991fce6da1c407a2d1f0ef9952902d 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then gatherSparkSubmitOpts "$@" exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}" else + # PySpark shell requires special handling downstream + export PYSPARK_SHELL=1 # Only use ipython if no command line arguments were provided [SPARK-1134] if [[ "$IPYTHON" = "1" ]]; then exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 7ca96ed57c2dbe5dc547783a07d653e48acd6817..38b5d8e1739d07af8c07dc6378b6529512a1289f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper { val builder = new ProcessBuilder(filteredCommand) val process = builder.start() - // Redirect stdin, stdout, and stderr to/from the child JVM + // Redirect stdout and stderr from the child JVM val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") stdoutThread.start() stderrThread.start() - // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning - // a thread that contends with the subprocess in reading from System.in. - if (Utils.isWindows) { - // For the PySpark shell, the termination of this process is handled in java_gateway.py - process.waitFor() - } else { - // Terminate on broken pipe, which signals that the parent process has exited. This is - // important for the PySpark shell, where Spark submit itself is a python subprocess. + // Redirect stdin to child JVM only if we're not running Windows. This is because the + // subprocess there already reads directly from our stdin, so we should avoid spawning a + // thread that contends with the subprocess in reading from System.in. + val isWindows = Utils.isWindows + val isPySparkShell = sys.env.contains("PYSPARK_SHELL") + if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() - stdinThread.join() - process.destroy() + // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM + // should terminate on broken pipe, which signals that the parent process has exited. In + // Windows, the termination logic for the PySpark shell is handled in java_gateway.py + if (isPySparkShell) { + stdinThread.join() + process.destroy() + } } + process.waitFor() } }