Skip to content
Snippets Groups Projects
Commit 2aaed0a4 authored by liuxian's avatar liuxian Committed by Sean Owen
Browse files

[SPARK-21006][TESTS][FOLLOW-UP] Some Worker's RpcEnv is leaked in WorkerSuite

## What changes were proposed in this pull request?

Create rpcEnv and run later needs shutdown. as #18226

## How was this patch tested?
unit test

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18259 from 10110346/wip-lx-0610.
parent 7b7c85ed
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
package org.apache.spark.deploy.worker
import org.scalatest.Matchers
import org.scalatest.{BeforeAndAfter, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState}
......@@ -25,7 +25,7 @@ import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorState
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
class WorkerSuite extends SparkFunSuite with Matchers {
class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
import org.apache.spark.deploy.DeployTestUtils._
......@@ -34,6 +34,25 @@ class WorkerSuite extends SparkFunSuite with Matchers {
}
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
private var _worker: Worker = _
private def makeWorker(conf: SparkConf): Worker = {
assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr)
_worker
}
after {
if (_worker != null) {
_worker.rpcEnv.shutdown()
_worker.rpcEnv.awaitTermination()
_worker = null
}
}
test("test isUseLocalNodeSSLConfig") {
Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
......@@ -65,9 +84,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedExecutors (small number of executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
......@@ -91,9 +108,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedExecutors (more executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
......@@ -126,9 +141,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedDrivers (small number of drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
val driverId = s"driverId-$i"
......@@ -152,9 +165,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedDrivers (more drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
val driverId = s"driverId-$i"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment