diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983ed4c652cc936f6df4f704a04cd9b1cd2b6..be53ca2968cfbafbf1e74d51da16b8193f374245 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -46,6 +46,7 @@ class TaskContext(
   }
 
   def executeOnCompleteCallbacks() {
-    onCompleteCallbacks.foreach{_()}
+    // Process complete callbacks in the reverse order of registration
+    onCompleteCallbacks.reverse.foreach{_()}
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e4d0285710e84a427eb956483907858aa067c20f..b67286a4e3b7572ea1cc657168e660874450754e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag](
       }
     }.start()
 
+    /*
+     * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
+     * other completion callbacks might invalidate the input. Because interruption
+     * is not synchronous this still leaves a potential race where the interruption is
+     * processed only after the stream becomes invalid.
+     */
+    context.addOnCompleteCallback(() => context.interrupted = true)
+
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
     val stdoutIterator = new Iterator[Array[Byte]] {