diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index e3413fd6652d8f726eff58dc9571182b0d21f0b3..28e9420b288027ec5a2873f991463ca685e3cbbc 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
       }
       super.close();
       if (handle != null) {
+    	if (!handle.getState().isFinal()) {
+    	  LOG.log(Level.WARNING, "Lost connection to spark application.");
+    	  handle.setState(SparkAppHandle.State.LOST);
+    	}
         handle.disconnect();
       }
     }
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 625d02632114a84ef9c55e43ce7703907c5bd5a6..0aa7bd197d16f897434b3a94a023ee233824612b 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -46,7 +46,9 @@ public interface SparkAppHandle {
     /** The application finished with a failed status. */
     FAILED(true),
     /** The application was killed. */
-    KILLED(true);
+    KILLED(true),
+    /** The Spark Submit JVM exited with a unknown status. */
+    LOST(true);
 
     private final boolean isFinal;
 
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index bfe1fcc87fe352b7d78cc6e24a07bd5cb880d6ee..12f1a0ce2d1b4b4127f0f2480d68c0c77aafcd61 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
     }
   }
 
+  @Test
+  public void testSparkSubmitVmShutsDown() throws Exception {
+    ChildProcAppHandle handle = LauncherServer.newAppHandle();
+    TestClient client = null;
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      Socket s = new Socket(InetAddress.getLoopbackAddress(),
+        LauncherServer.getServerInstance().getPort());
+      handle.addListener(new SparkAppHandle.Listener() {
+        public void stateChanged(SparkAppHandle handle) {
+          semaphore.release();
+        }
+        public void infoChanged(SparkAppHandle handle) {
+          semaphore.release();
+        }
+      });
+      client = new TestClient(s);
+      client.send(new Hello(handle.getSecret(), "1.4.0"));
+      assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+      // Make sure the server matched the client to the handle.
+      assertNotNull(handle.getConnection());
+      close(client);
+      assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+      assertEquals(SparkAppHandle.State.LOST, handle.getState());
+    } finally {
+      kill(handle);
+      close(client);
+      client.clientThread.join();
+    }
+  }
+
   private void kill(SparkAppHandle handle) {
     if (handle != null) {
       handle.kill();