diff --git a/Makefile b/Makefile index cec6f755f48c4f7052c648ef38451e608f6325ef..d30f54382628c2f4bf6ceb9808897f02b5e6fb99 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar JARS += third_party/scalatest-1.0/scalatest-1.0.jar JARS += third_party/ScalaCheck-1.5.jar +JARS += third_party/FreePastry-2.1.jar CLASSPATH = $(subst $(SPACE),:,$(JARS)) SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala diff --git a/run b/run index 2e78025d7c26cfb6a9f7bb8d0eb98f9788d8a6d3..c11b2b8456d2ade4f5dc95394699c594025093e1 100755 --- a/run +++ b/run @@ -18,8 +18,9 @@ CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar CLASSPATH+=:$FWDIR/third_party/colt.jar CLASSPATH+=:$FWDIR/third_party/google-collect-1.0-rc5/google-collect-1.0-rc5.jar CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar -CLASSPATH+=:third_party/scalatest-1.0/scalatest-1.0.jar -CLASSPATH+=:third_party/ScalaCheck-1.5.jar +CLASSPATH+=:$FWDIR/third_party/scalatest-1.0/scalatest-1.0.jar +CLASSPATH+=:$FWDIR/third_party/ScalaCheck-1.5.jar +CLASSPATH+=:$FWDIR/third_party/FreePastry-2.1.jar for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do CLASSPATH+=:$jar done diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 40353decdea59e04189dd43cf5b040d80913a8c7..1a95860e32716541be583aee45150501d21e940e 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -15,6 +15,17 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} import spark.compress.lzf.{LZFInputStream, LZFOutputStream} +import rice.environment.Environment +import rice.p2p.commonapi._ +import rice.p2p.commonapi.rawserialization.RawMessage +import rice.pastry._ +import rice.pastry.commonapi.PastryIdFactory +import rice.pastry.direct._ +import rice.pastry.socket.SocketPastryNodeFactory +import rice.pastry.standard.RandomNodeIdFactory +import rice.p2p.scribe._ +import rice.p2p.splitstream._ + @serializable trait BroadcastRecipe { val uuid = UUID.randomUUID @@ -628,6 +639,33 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } } +@serializable +class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean) + extends BroadcastRecipe { + + def value = value_ + + BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) } + + if (!local) { sendBroadcast } + + def sendBroadcast () { + + } + + private def readObject (in: ObjectInputStream) { + in.defaultReadObject + BroadcastSS.synchronized { + val cachedVal = BroadcastSS.values.get(uuid) + if (cachedVal != null) { + value_ = cachedVal.asInstanceOf[T] + } else { + + } + } + } +} + @serializable class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) extends BroadcastRecipe { @@ -725,6 +763,8 @@ private object Broadcast { BroadcastCH.initialize // Initialization for ChainedStreamingBroadcast BroadcastCS.initialize (isMaster) + // Initialization for SplitStreamBroadcast + BroadcastSS.initialize (isMaster) initialized = true } @@ -889,6 +929,107 @@ private object BroadcastCS { } } +private object BroadcastSS { + val values = new MapMaker ().softValues ().makeMap[UUID, Any] + + private var initialized = false + private var isMaster_ = false + + private var masterBootHost_ = "127.0.0.1" + private var masterBootPort_ : Int = 11111 + private var blockSize_ : Int = 512 * 1024 + private var maxRetryCount_ : Int = 2 + + private var masterBootAddress_ : InetSocketAddress = null + private var localBindPort_ : Int = -1 + + private var pEnvironment_ : Environment = null + private var pastryNode_ : PastryNode = null + + def initialize (isMaster__ : Boolean) { + synchronized { + if (!initialized) { + masterBootHost_ = + System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1") + masterBootPort_ = + System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt + + masterBootAddress_ = new InetSocketAddress(masterBootHost_, + masterBootPort_) + + blockSize_ = + System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 + maxRetryCount_ = + System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt + + isMaster_ = isMaster__ + + // Initialize the SplitStream tree + initializeSplitStream + + initialized = true + } + } + } + + def masterBootAddress = masterBootAddress_ + def blockSize = blockSize_ + def maxRetryCount = maxRetryCount_ + def pEnvironment = pEnvironment_ + def pastryNode = pastryNode_ + def localBindPort = { + if (localBindPort_ == -1) { + if (isMaster) { localBindPort_ = masterBootPort_ } + else { + // TODO: What's the best way of finding a free port? + val sSocket = new ServerSocket (0) + val sPort = sSocket.getLocalPort + sSocket.close + localBindPort_ = sPort + } + } + localBindPort_ + } + + def isMaster = isMaster_ + + private def initializeSplitStream = { + pEnvironment_ = new Environment + + // Generate the NodeIds Randomly + val nidFactory = new RandomNodeIdFactory (pEnvironment) + + // Construct the PastryNodeFactory + val pastryNodeFactory = new SocketPastryNodeFactory (nidFactory, + localBindPort, pEnvironment) + + // Construct a Pastry node + pastryNode_ = pastryNodeFactory.newNode + + // Boot the node. If its Master, start a new ring. + if (isMaster) { pastryNode.boot (null) } + else { pastryNode.boot (masterBootAddress)} + + // The node may require sending several messages to fully boot into the ring + pastryNode.synchronized { + while(!pastryNode.isReady && !pastryNode.joinFailed) { + // Delay so we don't busy-wait + pastryNode.wait(500); + + // Abort if can't join + if (pastryNode.joinFailed()) { + // TODO: throw new IOException("Join failed " + node.joinFailedReason) + } + } + } + + // construct a new splitstream application + // val app = new MySplitStreamClient(pastryNode) + // app.subscribe + // if (isMaster) { app.startPublishTask } + } +} + private object BroadcastCH { val values = new MapMaker ().softValues ().makeMap[UUID, Any] diff --git a/third_party/FreePastry-2.1.jar b/third_party/FreePastry-2.1.jar new file mode 100644 index 0000000000000000000000000000000000000000..51146e154130f7dd77803d3b9319e544a13a4cc7 Binary files /dev/null and b/third_party/FreePastry-2.1.jar differ diff --git a/third_party/libnexus.so b/third_party/libnexus.so new file mode 100755 index 0000000000000000000000000000000000000000..80c151751f0ece8a46cfe73cddd5c4e69460b5cc Binary files /dev/null and b/third_party/libnexus.so differ