Skip to content
Snippets Groups Projects
Commit 7ba34bc0 authored by Charles Reiss's avatar Charles Reiss
Browse files

Additional tests for MapOutputTracker.

parent 273fb5cc
No related branches found
No related tags found
No related merge requests found
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import akka.actor._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
import spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite {
class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
after {
System.clearProperty("spark.master.port")
}
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
......@@ -71,6 +77,78 @@ class MapOutputTrackerSuite extends FunSuite {
// The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[Exception] { tracker.getServerStatuses(10, 1) }
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
}
test("remote fetch") {
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
System.setProperty("spark.master.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((new BlockManagerId("hostA", 1000), size1000)))
masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
}
test("simulatenous fetch fails") {
val dummyActorSystem = ActorSystem("testDummy")
val dummyTracker = new MapOutputTracker(dummyActorSystem, true)
dummyTracker.registerShuffle(10, 1)
// val compressedSize1000 = MapOutputTracker.compressSize(1000L)
// val size100 = MapOutputTracker.decompressSize(compressedSize1000)
// dummyTracker.registerMapOutput(10, 0, new MapStatus(
// new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
val serializedMessage = dummyTracker.getSerializedLocations(10)
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
System.setProperty("spark.master.port", boundPort.toString)
val delayResponseLock = new java.lang.Object
val delayResponseActor = actorSystem.actorOf(Props(new Actor {
override def receive = {
case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
delayResponseLock.synchronized {
sender ! serializedMessage
}
}
}), name = "MapOutputTracker")
val slaveTracker = new MapOutputTracker(actorSystem, false)
var firstFailed = false
var secondFailed = false
val firstFetch = new Thread {
override def run() {
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
firstFailed = true
}
}
val secondFetch = new Thread {
override def run() {
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
secondFailed = true
}
}
delayResponseLock.synchronized {
firstFetch.start
secondFetch.start
}
firstFetch.join
secondFetch.join
assert(firstFailed && secondFailed)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment