diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c14dce8273bc122fa20bc33759807311ba09f87a..dcb6b6824b0a61a69eb7f09ed188a88c402bd7bb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -381,16 +381,27 @@ class SparkContext(config: SparkConf) extends Logging {
    * // In a separate thread:
    * sc.cancelJobGroup("some_job_to_cancel")
    * }}}
+   *
+   * If interruptOnCancel is set to true for the job group, then job cancellation will result
+   * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+   * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+   * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
    */
-  def setJobGroup(groupId: String, description: String) {
+  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+    // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
+    // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
+    // APIs to also take advantage of this property (e.g., internal job failures or canceling from
+    // JobProgressTab UI) on a per-job basis.
+    setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
   }
 
   /** Clear the current thread's job group ID and its description. */
   def clearJobGroup() {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
     setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
+    setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
   }
 
   // Post init
@@ -1244,6 +1255,8 @@ object SparkContext extends Logging {
 
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
 
+  private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
+
   private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
 
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8a843fbb0e66fb7a9128461b1d4fdec795b23a51..0d71fdbb03ec6b872ccfd4fe8542c62d866dbacd 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -169,7 +169,6 @@ private[spark] class PythonRDD[T: ClassTag](
                 val update = new Array[Byte](updateLen)
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
-
               }
               Array.empty[Byte]
           }
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 6327ac01663f66867066e6227da39b841a37c223..9ac7365f47f9fe55a1dbb58287b5ce6877ba65a7 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -69,12 +69,12 @@ private[spark] class CoarseGrainedExecutorBackend(
         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
       }
 
-    case KillTask(taskId, _) =>
+    case KillTask(taskId, _, interruptThread) =>
       if (executor == null) {
         logError("Received KillTask command but executor was null")
         System.exit(1)
       } else {
-        executor.killTask(taskId)
+        executor.killTask(taskId, interruptThread)
       }
 
     case x: DisassociatedEvent =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2bfb9c387e1c95021180989bbf2e686b00c153c2..914bc205cebe277a915fd010013997e743192ebb 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -136,10 +136,10 @@ private[spark] class Executor(
     threadPool.execute(tr)
   }
 
-  def killTask(taskId: Long) {
+  def killTask(taskId: Long, interruptThread: Boolean) {
     val tr = runningTasks.get(taskId)
     if (tr != null) {
-      tr.kill()
+      tr.kill(interruptThread)
     }
   }
 
@@ -166,11 +166,11 @@ private[spark] class Executor(
     @volatile private var killed = false
     @volatile private var task: Task[Any] = _
 
-    def kill() {
+    def kill(interruptThread: Boolean) {
       logInfo("Executor is trying to kill task " + taskId)
       killed = true
       if (task != null) {
-        task.kill()
+        task.kill(interruptThread)
       }
     }
 
@@ -257,7 +257,7 @@ private[spark] class Executor(
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
         }
 
-        case TaskKilledException => {
+        case TaskKilledException | _: InterruptedException if task.killed => {
           logInfo("Executor killed task " + taskId)
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
         }
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 6fc702fdb1512cc36a721bef16cb18417c9960bf..64e24506e80386cdcfd90d11533855bdded7469f 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -76,7 +76,8 @@ private[spark] class MesosExecutorBackend
     if (executor == null) {
       logError("Received KillTask but executor was null")
     } else {
-      executor.killTask(t.getValue.toLong)
+      // TODO: Determine the 'interruptOnCancel' property set for the given job.
+      executor.killTask(t.getValue.toLong, interruptThread = false)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c6cbf14e20069e2ea73a029fd8d2d19e8fd58653..dbde9b591dccc32a77146fab47048eefd552824c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1055,6 +1055,10 @@ class DAGScheduler(
     val error = new SparkException(failureReason)
     job.listener.jobFailed(error)
 
+    val shouldInterruptThread =
+      if (job.properties == null) false
+      else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
+
     // Cancel all independent, running stages.
     val stages = jobIdToStageIds(job.jobId)
     if (stages.isEmpty) {
@@ -1073,7 +1077,7 @@ class DAGScheduler(
           // This is the only job that uses this stage, so fail the stage if it is running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
-            taskScheduler.cancelTasks(stageId)
+            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
             val stageInfo = stageToInfos(stage)
             stageInfo.stageFailed(failureReason)
             listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index f1924a4573b217e47888a47ed7d6c9e6af64b3f4..6a6d8e609bc39a0844807ec9688e91e0c1f54940 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -28,5 +28,6 @@ private[spark] trait SchedulerBackend {
   def reviveOffers(): Unit
   def defaultParallelism(): Int
 
-  def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
+  def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
+    throw new UnsupportedOperationException
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index a8bcb7dfe2f3c589af30260b7bb1349926f49065..2ca3479c80efc147a5f6c279b09eb4bdb4180b1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -44,8 +44,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
 
   final def run(attemptId: Long): T = {
     context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
+    taskThread = Thread.currentThread()
     if (_killed) {
-      kill()
+      kill(interruptThread = false)
     }
     runTask(context)
   }
@@ -62,6 +63,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
   // Task context, to be initialized in run().
   @transient protected var context: TaskContext = _
 
+  // The actual Thread on which the task is running, if any. Initialized in run().
+  @volatile @transient private var taskThread: Thread = _
+
   // A flag to indicate whether the task is killed. This is used in case context is not yet
   // initialized when kill() is invoked.
   @volatile @transient private var _killed = false
@@ -75,12 +79,16 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
    * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
    * code and user code to properly handle the flag. This function should be idempotent so it can
    * be called multiple times.
+   * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread.
    */
-  def kill() {
+  def kill(interruptThread: Boolean) {
     _killed = true
     if (context != null) {
       context.interrupted = true
     }
+    if (interruptThread && taskThread != null) {
+      taskThread.interrupt()
+    }
   }
 }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 92616c997e20c313d3bba1802fd2394d9e761417..819c35257b5a705f03ae0355c05fd6e4044186d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -47,7 +47,7 @@ private[spark] trait TaskScheduler {
   def submitTasks(taskSet: TaskSet): Unit
 
   // Cancel a stage.
-  def cancelTasks(stageId: Int)
+  def cancelTasks(stageId: Int, interruptThread: Boolean)
 
   // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
   def setDAGScheduler(dagScheduler: DAGScheduler): Unit
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index fe72ab3e43146af80ba3b657d312aa621a7956a7..be19d9b8854c85daa6f3034278912d72f5aa80c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -170,7 +170,7 @@ private[spark] class TaskSchedulerImpl(
     backend.reviveOffers()
   }
 
-  override def cancelTasks(stageId: Int): Unit = synchronized {
+  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
     logInfo("Cancelling stage " + stageId)
     activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
       // There are two possible cases here:
@@ -181,7 +181,7 @@ private[spark] class TaskSchedulerImpl(
       //    simply abort the stage.
       tsm.runningTasksSet.foreach { tid =>
         val execId = taskIdToExecutorId(tid)
-        backend.killTask(tid, execId)
+        backend.killTask(tid, execId, interruptThread)
       }
       tsm.abort("Stage %s cancelled".format(stageId))
       logInfo("Stage %d was cancelled".format(stageId))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 03bf76083761f063ad0a015d9dd6173195e8f323..613fa7850bb2567699958299f34a7c984872340e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -31,8 +31,8 @@ private[spark] class TaskSet(
     val properties: Properties) {
     val id: String = stageId + "." + attempt
 
-  def kill() {
-    tasks.foreach(_.kill())
+  def kill(interruptThread: Boolean) {
+    tasks.foreach(_.kill(interruptThread))
   }
 
   override def toString: String = "TaskSet " + id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4a9a1659d825473377d3ff67272ec3e2cc16e2b6..ddbc74e82ac494362cd86d5a902efcd6b2c463b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -30,7 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {
   // Driver to executors
   case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
 
-  case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
+  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
+    extends CoarseGrainedClusterMessage
 
   case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
     extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7bfc30b4208a31cc2b8063b289f6eac65004fc9a..a6d6b3d26a3c6d036e1198bea82a7d3e153a5e03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -101,8 +101,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       case ReviveOffers =>
         makeOffers()
 
-      case KillTask(taskId, executorId) =>
-        executorActor(executorId) ! KillTask(taskId, executorId)
+      case KillTask(taskId, executorId, interruptThread) =>
+        executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
 
       case StopDriver =>
         sender ! true
@@ -207,8 +207,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     driverActor ! ReviveOffers
   }
 
-  override def killTask(taskId: Long, executorId: String) {
-    driverActor ! KillTask(taskId, executorId)
+  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+    driverActor ! KillTask(taskId, executorId, interruptThread)
   }
 
   override def defaultParallelism(): Int = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 16e2f5cf3076dbdd5e337e0001717c5ae2d37c60..43f0e18a0cbe03866f91eab05113a2ae4798ff25 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -30,7 +30,7 @@ private case class ReviveOffers()
 
 private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
 
-private case class KillTask(taskId: Long)
+private case class KillTask(taskId: Long, interruptThread: Boolean)
 
 /**
  * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
@@ -61,8 +61,8 @@ private[spark] class LocalActor(
         reviveOffers()
       }
 
-    case KillTask(taskId) =>
-      executor.killTask(taskId)
+    case KillTask(taskId, interruptThread) =>
+      executor.killTask(taskId, interruptThread)
   }
 
   def reviveOffers() {
@@ -99,8 +99,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
 
   override def defaultParallelism() = totalCores
 
-  override def killTask(taskId: Long, executorId: String) {
-    localActor ! KillTask(taskId)
+  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
+    localActor ! KillTask(taskId, interruptThread)
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 35a7ac9d049c24101d5783f9016566b061b45d22..ff69eb7e53f8e53033a11e1d81ec55a587ba64da 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -58,7 +58,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
       taskSets += taskSet
     }
-    override def cancelTasks(stageId: Int) {
+    override def cancelTasks(stageId: Int, interruptThread: Boolean) {
       cancelledStages += stageId
     }
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}