Skip to content
Snippets Groups Projects
Commit dc2c69e6 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

SplitStream implementation in progress.

parent bb0178d1
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
JARS += third_party/scalatest-1.0/scalatest-1.0.jar
JARS += third_party/ScalaCheck-1.5.jar
JARS += third_party/FreePastry-2.1.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS))
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
......
......@@ -18,8 +18,9 @@ CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
CLASSPATH+=:$FWDIR/third_party/colt.jar
CLASSPATH+=:$FWDIR/third_party/google-collect-1.0-rc5/google-collect-1.0-rc5.jar
CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
CLASSPATH+=:third_party/scalatest-1.0/scalatest-1.0.jar
CLASSPATH+=:third_party/ScalaCheck-1.5.jar
CLASSPATH+=:$FWDIR/third_party/scalatest-1.0/scalatest-1.0.jar
CLASSPATH+=:$FWDIR/third_party/ScalaCheck-1.5.jar
CLASSPATH+=:$FWDIR/third_party/FreePastry-2.1.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
CLASSPATH+=:$jar
done
......
......@@ -15,6 +15,17 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
import rice.environment.Environment
import rice.p2p.commonapi._
import rice.p2p.commonapi.rawserialization.RawMessage
import rice.pastry._
import rice.pastry.commonapi.PastryIdFactory
import rice.pastry.direct._
import rice.pastry.socket.SocketPastryNodeFactory
import rice.pastry.standard.RandomNodeIdFactory
import rice.p2p.scribe._
import rice.p2p.splitstream._
@serializable
trait BroadcastRecipe {
val uuid = UUID.randomUUID
......@@ -628,6 +639,33 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
@serializable
class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean)
extends BroadcastRecipe {
def value = value_
BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) }
if (!local) { sendBroadcast }
def sendBroadcast () {
}
private def readObject (in: ObjectInputStream) {
in.defaultReadObject
BroadcastSS.synchronized {
val cachedVal = BroadcastSS.values.get(uuid)
if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T]
} else {
}
}
}
}
@serializable
class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
extends BroadcastRecipe {
......@@ -725,6 +763,8 @@ private object Broadcast {
BroadcastCH.initialize
// Initialization for ChainedStreamingBroadcast
BroadcastCS.initialize (isMaster)
// Initialization for SplitStreamBroadcast
BroadcastSS.initialize (isMaster)
initialized = true
}
......@@ -889,6 +929,107 @@ private object BroadcastCS {
}
}
private object BroadcastSS {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
private var initialized = false
private var isMaster_ = false
private var masterBootHost_ = "127.0.0.1"
private var masterBootPort_ : Int = 11111
private var blockSize_ : Int = 512 * 1024
private var maxRetryCount_ : Int = 2
private var masterBootAddress_ : InetSocketAddress = null
private var localBindPort_ : Int = -1
private var pEnvironment_ : Environment = null
private var pastryNode_ : PastryNode = null
def initialize (isMaster__ : Boolean) {
synchronized {
if (!initialized) {
masterBootHost_ =
System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1")
masterBootPort_ =
System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt
masterBootAddress_ = new InetSocketAddress(masterBootHost_,
masterBootPort_)
blockSize_ =
System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
maxRetryCount_ =
System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
isMaster_ = isMaster__
// Initialize the SplitStream tree
initializeSplitStream
initialized = true
}
}
}
def masterBootAddress = masterBootAddress_
def blockSize = blockSize_
def maxRetryCount = maxRetryCount_
def pEnvironment = pEnvironment_
def pastryNode = pastryNode_
def localBindPort = {
if (localBindPort_ == -1) {
if (isMaster) { localBindPort_ = masterBootPort_ }
else {
// TODO: What's the best way of finding a free port?
val sSocket = new ServerSocket (0)
val sPort = sSocket.getLocalPort
sSocket.close
localBindPort_ = sPort
}
}
localBindPort_
}
def isMaster = isMaster_
private def initializeSplitStream = {
pEnvironment_ = new Environment
// Generate the NodeIds Randomly
val nidFactory = new RandomNodeIdFactory (pEnvironment)
// Construct the PastryNodeFactory
val pastryNodeFactory = new SocketPastryNodeFactory (nidFactory,
localBindPort, pEnvironment)
// Construct a Pastry node
pastryNode_ = pastryNodeFactory.newNode
// Boot the node. If its Master, start a new ring.
if (isMaster) { pastryNode.boot (null) }
else { pastryNode.boot (masterBootAddress)}
// The node may require sending several messages to fully boot into the ring
pastryNode.synchronized {
while(!pastryNode.isReady && !pastryNode.joinFailed) {
// Delay so we don't busy-wait
pastryNode.wait(500);
// Abort if can't join
if (pastryNode.joinFailed()) {
// TODO: throw new IOException("Join failed " + node.joinFailedReason)
}
}
}
// construct a new splitstream application
// val app = new MySplitStreamClient(pastryNode)
// app.subscribe
// if (isMaster) { app.startPublishTask }
}
}
private object BroadcastCH {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
......
File added
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment