Skip to content
Snippets Groups Projects
Commit f24bfd2d authored by Reynold Xin's avatar Reynold Xin
Browse files

For size compression, compress non zero values into non zero values.

parent cd16eab0
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,10 @@ package spark ...@@ -2,6 +2,10 @@ package spark
import java.io._ import java.io._
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import akka.actor._ import akka.actor._
import akka.dispatch._ import akka.dispatch._
...@@ -11,16 +15,13 @@ import akka.util.Duration ...@@ -11,16 +15,13 @@ import akka.util.Duration
import akka.util.Timeout import akka.util.Timeout
import akka.util.duration._ import akka.util.duration._
import scala.collection.mutable.HashMap import spark.scheduler.MapStatus
import scala.collection.mutable.HashSet
import scheduler.MapStatus
import spark.storage.BlockManagerId import spark.storage.BlockManagerId
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
private[spark] sealed trait MapOutputTrackerMessage private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
extends MapOutputTrackerMessage extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
...@@ -88,14 +89,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -88,14 +89,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
} }
mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)) mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
} }
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
var array = mapStatuses.get(shuffleId) var array = mapStatuses.get(shuffleId)
array.synchronized { array.synchronized {
array(mapId) = status array(mapId) = status
} }
} }
def registerMapOutputs( def registerMapOutputs(
shuffleId: Int, shuffleId: Int,
statuses: Array[MapStatus], statuses: Array[MapStatus],
...@@ -119,10 +120,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -119,10 +120,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID") throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
} }
} }
// Remembers which map output locations are currently being fetched on a worker // Remembers which map output locations are currently being fetched on a worker
val fetching = new HashSet[Int] val fetching = new HashSet[Int]
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId) val statuses = mapStatuses.get(shuffleId)
...@@ -149,7 +150,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea ...@@ -149,7 +150,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
val host = System.getProperty("spark.hostname", Utils.localHostName) val host = System.getProperty("spark.hostname", Utils.localHostName)
val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]] val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
val fetchedStatuses = deserializeStatuses(fetchedBytes) val fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations") logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses) mapStatuses.put(shuffleId, fetchedStatuses)
fetching.synchronized { fetching.synchronized {
...@@ -254,8 +255,10 @@ private[spark] object MapOutputTracker { ...@@ -254,8 +255,10 @@ private[spark] object MapOutputTracker {
* sizes up to 35 GB with at most 10% error. * sizes up to 35 GB with at most 10% error.
*/ */
def compressSize(size: Long): Byte = { def compressSize(size: Long): Byte = {
if (size <= 1L) { if (size == 0) {
0 0
} else if (size <= 1L) {
1
} else { } else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
} }
...@@ -266,7 +269,7 @@ private[spark] object MapOutputTracker { ...@@ -266,7 +269,7 @@ private[spark] object MapOutputTracker {
*/ */
def decompressSize(compressedSize: Byte): Long = { def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) { if (compressedSize == 0) {
1 0
} else { } else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
} }
......
...@@ -5,7 +5,7 @@ import org.scalatest.FunSuite ...@@ -5,7 +5,7 @@ import org.scalatest.FunSuite
class MapOutputTrackerSuite extends FunSuite { class MapOutputTrackerSuite extends FunSuite {
test("compressSize") { test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 0) assert(MapOutputTracker.compressSize(1L) === 1)
assert(MapOutputTracker.compressSize(2L) === 8) assert(MapOutputTracker.compressSize(2L) === 8)
assert(MapOutputTracker.compressSize(10L) === 25) assert(MapOutputTracker.compressSize(10L) === 25)
assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145) assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
...@@ -15,7 +15,7 @@ class MapOutputTrackerSuite extends FunSuite { ...@@ -15,7 +15,7 @@ class MapOutputTrackerSuite extends FunSuite {
} }
test("decompressSize") { test("decompressSize") {
assert(MapOutputTracker.decompressSize(0) === 1) assert(MapOutputTracker.decompressSize(0) === 0)
for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) { for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size)) val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
assert(size2 >= 0.99 * size && size2 <= 1.11 * size, assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment