diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 160fc42c57d1841ec5c780a6175c37e64ada603b..7b77d447ce6df33665cc4638b3bc7acd4303605c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.CallSite
+import org.apache.spark.util.{CallSite, Utils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -201,6 +201,8 @@ class StreamingContext private[streaming] (
 
   private val startSite = new AtomicReference[CallSite](null)
 
+  private var shutdownHookRef: AnyRef = _
+
   /**
    * Return the associated Spark context
    */
@@ -584,6 +586,8 @@ class StreamingContext private[streaming] (
           state = StreamingContextState.ACTIVE
           StreamingContext.setActiveContext(this)
         }
+        shutdownHookRef = Utils.addShutdownHook(
+          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
         logInfo("StreamingContext started")
       case ACTIVE =>
         logWarning("StreamingContext has already been started")
@@ -660,6 +664,9 @@ class StreamingContext private[streaming] (
           uiTab.foreach(_.detach())
           StreamingContext.setActiveContext(null)
           waiter.notifyStop()
+          if (shutdownHookRef != null) {
+            Utils.removeShutdownHook(shutdownHookRef)
+          }
           logInfo("StreamingContext stopped successfully")
       }
       // Even if we have already stopped, we still need to attempt to stop the SparkContext because
@@ -670,6 +677,13 @@ class StreamingContext private[streaming] (
       state = STOPPED
     }
   }
+
+  private def stopOnShutdown(): Unit = {
+    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
+    logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
+    // Do not stop SparkContext, let its own shutdown hook stop it
+    stop(stopSparkContext = false, stopGracefully = stopGracefully)
+  }
 }
 
 /**
@@ -685,6 +699,8 @@ object StreamingContext extends Logging {
    */
   private val ACTIVATION_LOCK = new Object()
 
+  private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
+
   private val activeContext = new AtomicReference[StreamingContext](null)
 
   private def assertNoOtherContextIsActive(): Unit = {