Skip to content
Snippets Groups Projects
Commit 98ab4112 authored by Sandy Ryza's avatar Sandy Ryza Committed by Reynold Xin
Browse files

SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical section...

...s of CoGroupedRDD and PairRDDFunctions

This also removes an unnecessary tuple creation in cogroup.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #1447 from sryza/sandy-spark-2519-2 and squashes the following commits:

b6d9699 [Sandy Ryza] Remove missed Tuple2 match in CoGroupedRDD
a109828 [Sandy Ryza] Remove another pattern matching in MappedValuesRDD and revert some changes in PairRDDFunctions
be10f8a [Sandy Ryza] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions
parent 4da01e38
No related branches found
No related tags found
No related merge requests found
......@@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
value match { case (v, depNum) => newCombiner(depNum) += v }
newCombiner(value._2) += value._1
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
value match { case (v, depNum) => combiner(depNum) += v }
combiner(value._2) += value._1
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
......
......@@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
override val partitioner = firstParent[Product2[K, U]].partitioner
override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
}
}
......@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
m1
} : JHashMap[K, V]
......@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}
/**
......@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.map(v => (v, None))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
}
}
}
......@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
ws.map(w => (None, w))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.map(w => (None, w))
} else {
for (v <- vs; w <- ws) yield (Some(v), w)
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
}
}
}
......@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { case (k, v) => map.put(k, v) }
data.foreach { pair => map.put(pair._1, pair._2) }
map
}
......@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
}
}
......@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
cg.mapValues { case Seq(vs, w1s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
}
}
......@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
}
......@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
......@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
val pair = iter.next()
writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment