Skip to content
Snippets Groups Projects
Commit c02e0649 authored by Tathagata Das's avatar Tathagata Das
Browse files

Fixed replication bug in BlockManager

parent 4d480ec5
No related branches found
No related tags found
No related merge requests found
...@@ -32,8 +32,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -32,8 +32,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
bytes.rewind()
if (level.deserialized) { if (level.deserialized) {
bytes.rewind()
val values = blockManager.dataDeserialize(blockId, bytes) val values = blockManager.dataDeserialize(blockId, bytes)
val elements = new ArrayBuffer[Any] val elements = new ArrayBuffer[Any]
elements ++= values elements ++= values
...@@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -58,7 +58,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else { } else {
val bytes = blockManager.dataSerialize(blockId, values.iterator) val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, false) tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes)) PutResult(bytes.limit(), Right(bytes.duplicate()))
} }
} }
......
package spark package spark
import network.ConnectionManagerId
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers import org.scalatest.matchers.ShouldMatchers
...@@ -13,7 +14,7 @@ import com.google.common.io.Files ...@@ -13,7 +14,7 @@ import com.google.common.io.Files
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import SparkContext._ import SparkContext._
import storage.StorageLevel import storage.{GetBlock, BlockManagerWorker, StorageLevel}
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
...@@ -140,9 +141,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter ...@@ -140,9 +141,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
test("caching in memory and disk, serialized, replicated") { test("caching in memory and disk, serialized, replicated") {
sc = new SparkContext(clusterUrl, "test") sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
assert(data.count() === 1000) assert(data.count() === 1000)
assert(data.count() === 1000) assert(data.count() === 1000)
assert(data.count() === 1000) assert(data.count() === 1000)
// Get all the locations of the first partition and try to fetch the partitions
// from those locations.
val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
val blockId = blockIds(0)
val blockManager = SparkEnv.get.blockManager
blockManager.master.getLocations(blockId).foreach(id => {
val bytes = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(id.ip, id.port))
val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
assert(deserialized === (1 to 100).toList)
})
} }
test("compute without caching when no partitions fit in memory") { test("compute without caching when no partitions fit in memory") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment