diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 101a44edd8ee28fc546c4cf7970b8ac3dc144402..ce212a7513310d3f640fd09decd506663e90fbe9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.worker
 
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfter, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.{Command, ExecutorState}
@@ -25,7 +25,7 @@ import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorState
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 
-class WorkerSuite extends SparkFunSuite with Matchers {
+class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   import org.apache.spark.deploy.DeployTestUtils._
 
@@ -34,6 +34,25 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   }
   def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
 
+  private var _worker: Worker = _
+
+  private def makeWorker(conf: SparkConf): Worker = {
+    assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
+    val securityMgr = new SecurityManager(conf)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
+    _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+      "Worker", "/tmp", conf, securityMgr)
+    _worker
+  }
+
+  after {
+    if (_worker != null) {
+      _worker.rpcEnv.shutdown()
+      _worker.rpcEnv.awaitTermination()
+      _worker = null
+    }
+  }
+
   test("test isUseLocalNodeSSLConfig") {
     Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
     Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
@@ -65,9 +84,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedExecutors (small number of executors)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedExecutors", 2.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
       worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -91,9 +108,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedExecutors (more executors)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedExecutors", 30.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
       worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -126,9 +141,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedDrivers (small number of drivers)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedDrivers", 2.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
       val driverId = s"driverId-$i"
@@ -152,9 +165,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedDrivers (more drivers)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedDrivers", 30.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
       val driverId = s"driverId-$i"