From 44460ba594fbfe5a6ee66e5121ead914bf16f9f6 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Sat, 2 Aug 2014 01:11:03 -0700
Subject: [PATCH] HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.

This has been failing on master. One possible cause is that the port
gets contended if multiple test runs happen concurrently and they
hit this test at the same time. Since this test takes a long time
(60 seconds) that's very plausible. This patch randomizes the port
used in this test to avoid contention.
---
 .../spark/streaming/flume/FlumePollingStreamSuite.scala    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 47071d0cc4..27bf2ac962 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
 import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+import java.util.Random
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._
 
 class FlumePollingStreamSuite extends TestSuiteBase {
 
-  val testPort = 9999
+  val random = new Random()
+  /** Return a port in the ephemeral range. */
+  def getTestPort = random.nextInt(16382) + 49152
   val batchCount = 5
   val eventsPerBatch = 100
   val totalEventsPerChannel = batchCount * eventsPerBatch
   val channelCapacity = 5000
 
   test("flume polling test") {
+    val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
   }
 
   test("flume polling test multiple hosts") {
+    val testPort = getTestPort
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
     val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
-- 
GitLab