Skip to content
Snippets Groups Projects
Commit 38835325 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Bug fix. Fixed log messages. Updated BroadcastTest example to have iterations.

parent 8f2bd399
No related branches found
No related tags found
No related merge requests found
......@@ -722,7 +722,6 @@ extends Broadcast[T] with Logging with Serializable {
guidePortLock.synchronized { guidePortLock.notifyAll() }
try {
// Don't stop until there is a copy in HDFS
while (!stopBroadcast) {
var clientSocket: Socket = null
try {
......@@ -730,14 +729,13 @@ extends Broadcast[T] with Logging with Serializable {
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
logError("GuideMultipleRequests Timeout.")
// Stop broadcast if at least one worker has connected and
// everyone connected so far are done. Comparing with
// listOfSources.size - 1, because it includes the Guide itself
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
}
}
}
......@@ -918,9 +916,7 @@ extends Broadcast[T] with Logging with Serializable {
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
logError("ServeMultipleRequests Timeout.")
}
case e: Exception => { }
}
if (clientSocket != null) {
logDebug("Serve: Accepted new client connection:" + clientSocket)
......
......@@ -229,7 +229,7 @@ extends Logging {
var oosTracker: ObjectOutputStream = null
var oisTracker: ObjectInputStream = null
var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxOverGoToDefault)
var gInfo: SourceInfo = SourceInfo("", SourceInfo.TxNotStartedRetry)
var retriesLeft = MultiTracker.MaxRetryCount
do {
......
......@@ -27,9 +27,10 @@ extends Comparable[SourceInfo] with Logging {
* Helper Object of SourceInfo for its constants
*/
object SourceInfo {
// Constants for special values of listenPort
// Broadcast has not started yet! Should never happen.
val TxNotStartedRetry = -1
val TxOverGoToDefault = 0
// Broadcast has already finished. Try default mechanism.
val TxOverGoToDefault = -3
// Other constants
val StopBroadcast = -2
val UnusedParam = 0
......
......@@ -17,9 +17,13 @@ object BroadcastTest {
for (i <- 0 until arr1.length)
arr1(i) = i
val barr1 = spark.broadcast(arr1)
spark.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
for (i <- 0 until 2) {
println("Iteration " + i)
println("===========")
val barr1 = spark.broadcast(arr1)
spark.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
}
System.exit(0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment