From 2aaed0a4db84e99186b52a2c49d532702b575406 Mon Sep 17 00:00:00 2001
From: liuxian <liu.xian3@zte.com.cn>
Date: Tue, 13 Jun 2017 12:29:50 +0100
Subject: [PATCH] [SPARK-21006][TESTS][FOLLOW-UP] Some Worker's RpcEnv is
 leaked in WorkerSuite

## What changes were proposed in this pull request?

Create rpcEnv and run later needs shutdown. as #18226

## How was this patch tested?
unit test

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18259 from 10110346/wip-lx-0610.
---
 .../spark/deploy/worker/WorkerSuite.scala     | 39 ++++++++++++-------
 1 file changed, 25 insertions(+), 14 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 101a44edd8..ce212a7513 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.worker
 
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfter, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.{Command, ExecutorState}
@@ -25,7 +25,7 @@ import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorState
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 
-class WorkerSuite extends SparkFunSuite with Matchers {
+class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   import org.apache.spark.deploy.DeployTestUtils._
 
@@ -34,6 +34,25 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   }
   def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
 
+  private var _worker: Worker = _
+
+  private def makeWorker(conf: SparkConf): Worker = {
+    assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
+    val securityMgr = new SecurityManager(conf)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
+    _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+      "Worker", "/tmp", conf, securityMgr)
+    _worker
+  }
+
+  after {
+    if (_worker != null) {
+      _worker.rpcEnv.shutdown()
+      _worker.rpcEnv.awaitTermination()
+      _worker = null
+    }
+  }
+
   test("test isUseLocalNodeSSLConfig") {
     Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
     Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
@@ -65,9 +84,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedExecutors (small number of executors)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedExecutors", 2.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
       worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -91,9 +108,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedExecutors (more executors)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedExecutors", 30.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
       worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -126,9 +141,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedDrivers (small number of drivers)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedDrivers", 2.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
       val driverId = s"driverId-$i"
@@ -152,9 +165,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedDrivers (more drivers)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedDrivers", 30.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
       val driverId = s"driverId-$i"
-- 
GitLab