From b40907310266be1be5db5f773bc9bcbf2813c090 Mon Sep 17 00:00:00 2001
From: Mark Hamstra <markhamstra@gmail.com>
Date: Fri, 1 Mar 2013 15:05:07 -0800
Subject: [PATCH] Instead of failing to bind to a fixed, already-in-use port,
 let the OS choose an available port for TestServer.

---
 .../spark/streaming/InputStreamsSuite.scala    | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index ebcb6d0092..4d33857b25 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -29,7 +29,7 @@ import java.nio.charset.Charset
 import com.google.common.io.Files
 
 class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
-    
+
   System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
 
   val testPort = 9999
@@ -44,12 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
   test("socket input stream") {
     // Start the server
-    val testServer = new TestServer(testPort)
+    val testServer = new TestServer()
     testServer.start()
 
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(master, framework, batchDuration)
-    val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+    val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String  ]]
     val outputStream = new TestOutputStream(networkStream, outputBuffer)
     def output = outputBuffer.flatMap(x => x)
@@ -193,8 +193,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
   test("actor input stream") {
     // Start the server
-    val port = testPort
-    val testServer = new TestServer(port)
+    val testServer = new TestServer()
+    val port = testServer.port
     testServer.start()
 
     // Set up the streaming context and input streams
@@ -244,11 +244,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
 
 /** This is server to test the network input stream */
-class TestServer(port: Int) extends Logging {
+class TestServer() extends Logging {
 
   val queue = new ArrayBlockingQueue[String](100)
 
-  val serverSocket = new ServerSocket(port)
+  val serverSocket = new ServerSocket(0)
 
   val servingThread = new Thread() {
     override def run() {
@@ -290,11 +290,13 @@ class TestServer(port: Int) extends Logging {
   def send(msg: String) { queue.add(msg) }
 
   def stop() { servingThread.interrupt() }
+
+  def port = serverSocket.getLocalPort
 }
 
 object TestServer {
   def main(args: Array[String]) {
-    val s = new TestServer(9999)
+    val s = new TestServer()
     s.start()
     while(true) {
       Thread.sleep(1000)
-- 
GitLab