Skip to content
Snippets Groups Projects
Commit 94b3db13 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #508 from markhamstra/TestServerInUse

Avoid bind failure in InputStreamsSuite
parents 25c71d3e b4090731
No related branches found
No related tags found
No related merge requests found
......@@ -29,7 +29,7 @@ import java.nio.charset.Charset
import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val testPort = 9999
......@@ -44,12 +44,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("socket input stream") {
// Start the server
val testServer = new TestServer(testPort)
val testServer = new TestServer()
testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
......@@ -193,8 +193,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("actor input stream") {
// Start the server
val port = testPort
val testServer = new TestServer(port)
val testServer = new TestServer()
val port = testServer.port
testServer.start()
// Set up the streaming context and input streams
......@@ -244,11 +244,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
class TestServer() extends Logging {
val queue = new ArrayBlockingQueue[String](100)
val serverSocket = new ServerSocket(port)
val serverSocket = new ServerSocket(0)
val servingThread = new Thread() {
override def run() {
......@@ -290,11 +290,13 @@ class TestServer(port: Int) extends Logging {
def send(msg: String) { queue.add(msg) }
def stop() { servingThread.interrupt() }
def port = serverSocket.getLocalPort
}
object TestServer {
def main(args: Array[String]) {
val s = new TestServer(9999)
val s = new TestServer()
s.start()
while(true) {
Thread.sleep(1000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment