diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index c5fd40816d62f5e78e1b90561d5a96d517f83a5b..d099ee9aa9dae795c6e27be756ba36d6df9ec96e 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -242,7 +242,14 @@ class LauncherServer implements Closeable { synchronized (clients) { clients.add(clientConnection); } - timeoutTimer.schedule(timeout, getConnectionTimeout()); + long timeoutMs = getConnectionTimeout(); + // 0 is used for testing to avoid issues with clock resolution / thread scheduling, + // and force an immediate timeout. + if (timeoutMs > 0) { + timeoutTimer.schedule(timeout, getConnectionTimeout()); + } else { + timeout.run(); + } } } } catch (IOException ioe) { diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index 27cd1061a15b3a20b2bc4cadc896043c36523be2..dc8fbb58d880b98c95375d7672c19c282caeea5f 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -121,12 +121,12 @@ public class LauncherServerSuite extends BaseSuite { @Test public void testTimeout() throws Exception { - final long TEST_TIMEOUT = 10L; - ChildProcAppHandle handle = null; TestClient client = null; try { - SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, String.valueOf(TEST_TIMEOUT)); + // LauncherServer will immediately close the server-side socket when the timeout is set + // to 0. + SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "0"); handle = LauncherServer.newAppHandle(); @@ -134,12 +134,29 @@ public class LauncherServerSuite extends BaseSuite { LauncherServer.getServerInstance().getPort()); client = new TestClient(s); - Thread.sleep(TEST_TIMEOUT * 10); - try { - client.send(new Hello(handle.getSecret(), "1.4.0")); - fail("Expected exception caused by connection timeout."); - } catch (IllegalStateException e) { - // Expected. + // Try a few times since the client-side socket may not reflect the server-side close + // immediately. + boolean helloSent = false; + int maxTries = 10; + for (int i = 0; i < maxTries; i++) { + try { + if (!helloSent) { + client.send(new Hello(handle.getSecret(), "1.4.0")); + helloSent = true; + } else { + client.send(new SetAppId("appId")); + } + fail("Expected exception caused by connection timeout."); + } catch (IllegalStateException | IOException e) { + // Expected. + break; + } catch (AssertionError e) { + if (i < maxTries - 1) { + Thread.sleep(100); + } else { + throw new AssertionError("Test failed after " + maxTries + " attempts.", e); + } + } } } finally { SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT);