Skip to content
Snippets Groups Projects
Commit d48ee7e5 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge branch 'master' of github.com:mesos/spark into cogroup

parents 00a11304 945d1e72
No related branches found
No related tags found
No related merge requests found
......@@ -74,16 +74,10 @@ private[spark] class Worker(
def connectToMaster() {
logInfo("Connecting to master " + masterUrl)
try {
master = context.actorFor(Master.toAkkaUrl(masterUrl))
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
master = context.actorFor(Master.toAkkaUrl(masterUrl))
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
def startWebUi() {
......
......@@ -37,8 +37,8 @@ class CoalescedRDD[T: ClassManifest](
prevSplits.map(_.index).map{idx => new CoalescedRDDPartition(idx, prev, Array(idx)) }
} else {
(0 until maxPartitions).map { i =>
val rangeStart = (i * prevSplits.length) / maxPartitions
val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions
val rangeStart = ((i.toLong * prevSplits.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prevSplits.length) / maxPartitions).toInt
new CoalescedRDDPartition(i, prev, (rangeStart until rangeEnd).toArray)
}.toArray
}
......
......@@ -17,6 +17,7 @@ import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
/**
......@@ -50,6 +51,9 @@ class HadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
......@@ -69,6 +73,9 @@ class HadoopRDD[K, V](
val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
if (fmt.isInstanceOf[Configurable]) {
fmt.asInstanceOf[Configurable].setConf(conf)
}
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
......
......@@ -3,7 +3,7 @@ package spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
......@@ -42,6 +42,9 @@ class NewHadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
......@@ -57,6 +60,9 @@ class NewHadoopRDD[K, V](
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
if (format.isInstanceOf[Configurable]) {
format.asInstanceOf[Configurable].setConf(conf)
}
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment