diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bd9155ef295acd66a643e9f757808337bc197f94..bf52b510b4fd49616ebc9ca1985c4f9fdb9162d3 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -50,16 +50,6 @@ private[spark]
 case class BlockException(blockId: String, message: String, ex: Exception = null)
 extends Exception(message)
 
-
-private[spark] class BlockLocker(numLockers: Int) {
-  private val hashLocker = Array.fill(numLockers)(new Object())
-
-  def getLock(blockId: String): Object = {
-    return hashLocker(math.abs(blockId.hashCode % numLockers))
-  }
-}
-
-
 private[spark]
 class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
   extends Logging {
@@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     }
   }
 
-  private val NUM_LOCKS = 337
-  private val locker = new BlockLocker(NUM_LOCKS)
-
-  private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
+  private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
 
   private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
   private[storage] val diskStore: BlockStore =
@@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   val maxBytesInFlight =
     System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
 
+  // Whether to compress broadcast variables that are stored
   val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
+  // Whether to compress shuffle output that are stored
   val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
   // Whether to compress RDD partitions that are stored serialized
   val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
@@ -150,28 +139,28 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
    * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
    */
   def reportBlockStatus(blockId: String) {
-    locker.getLock(blockId).synchronized {
-      val curLevel = blockInfo.get(blockId) match {
-        case null =>
-          StorageLevel.NONE
-        case info =>
+
+    val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
+      case null =>
+        (StorageLevel.NONE, 0L, 0L)
+      case info =>
+        info.synchronized {
           info.level match {
             case null =>
-              StorageLevel.NONE
+              (StorageLevel.NONE, 0L, 0L)
             case level =>
               val inMem = level.useMemory && memoryStore.contains(blockId)
               val onDisk = level.useDisk && diskStore.contains(blockId)
-              new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+              (
+                new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
+                if (inMem) memoryStore.getSize(blockId) else 0L,
+                if (onDisk) diskStore.getSize(blockId) else 0L
+              )
           }
-      }
-      master.mustHeartBeat(HeartBeat(
-        blockManagerId,
-        blockId,
-        curLevel,
-        if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
-        if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
-      logDebug("Told master about block " + blockId)
+        }
     }
+    master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+    logDebug("Told master about block " + blockId)
   }
 
   /**
@@ -213,9 +202,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       }
     }
 
-    locker.getLock(blockId).synchronized {
-      val info = blockInfo.get(blockId)
-      if (info != null) {
+    val info = blockInfo.get(blockId)
+    if (info != null) {
+      info.synchronized {
         info.waitForReady() // In case the block is still being put() by another thread
         val level = info.level
         logDebug("Level for block " + blockId + " is " + level)
@@ -273,9 +262,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
             }
           }
         }
-      } else {
-        logDebug("Block " + blockId + " not registered locally")
       }
+    } else {
+      logDebug("Block " + blockId + " not registered locally")
     }
     return None
   }
@@ -298,9 +287,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       }
     }
 
-    locker.getLock(blockId).synchronized {
-      val info = blockInfo.get(blockId)
-      if (info != null) {
+    val info = blockInfo.get(blockId)
+    if (info != null) {
+      info.synchronized {
         info.waitForReady() // In case the block is still being put() by another thread
         val level = info.level
         logDebug("Level for block " + blockId + " is " + level)
@@ -338,9 +327,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
               throw new Exception("Block " + blockId + " not found on disk, though it should be")
           }
         }
-      } else {
-        logDebug("Block " + blockId + " not registered locally")
       }
+    } else {
+      logDebug("Block " + blockId + " not registered locally")
     }
     return None
   }
@@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     // Size of the block in bytes (to return to caller)
     var size = 0L
 
-    locker.getLock(blockId).synchronized {
+    myInfo.synchronized {
       logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
 
@@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       null
     }
 
-    locker.getLock(blockId).synchronized {
+    myInfo.synchronized {
       logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
 
@@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
    */
   def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
     logInfo("Dropping block " + blockId + " from memory")
-    locker.getLock(blockId).synchronized {
-      val info = blockInfo.get(blockId)
-      val level = info.level
-      if (level.useDisk && !diskStore.contains(blockId)) {
-        logInfo("Writing block " + blockId + " to disk")
-        data match {
-          case Left(elements) =>
-            diskStore.putValues(blockId, elements, level, false)
-          case Right(bytes) =>
-            diskStore.putBytes(blockId, bytes, level)
+    val info = blockInfo.get(blockId)
+    if (info != null)  {
+      info.synchronized {
+        val level = info.level
+        if (level.useDisk && !diskStore.contains(blockId)) {
+          logInfo("Writing block " + blockId + " to disk")
+          data match {
+            case Left(elements) =>
+              diskStore.putValues(blockId, elements, level, false)
+            case Right(bytes) =>
+              diskStore.putBytes(blockId, bytes, level)
+          }
+        }
+        memoryStore.remove(blockId)
+        if (info.tellMaster) {
+          reportBlockStatus(blockId)
+        }
+        if (!level.useDisk) {
+          // The block is completely gone from this node; forget it so we can put() it again later.
+          blockInfo.remove(blockId)
         }
       }
-      memoryStore.remove(blockId)
-      if (info.tellMaster) {
-        reportBlockStatus(blockId)
-      }
-      if (!level.useDisk) {
-        // The block is completely gone from this node; forget it so we can put() it again later.
-        blockInfo.remove(blockId)
-      }
+    } else {
+      // The block has already been dropped
     }
   }
 
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 074ca2b8a4e25c3d8bbc699e786802f415dfd591..02098b82fe77380eeee685f6c7b5b23b6df4a51f 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -18,12 +18,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
   private var currentMemory = 0L
 
+  // Object used to ensure that only one thread is putting blocks and if necessary, dropping
+  // blocks from the memory store.
+  private val putLock = new Object()
+
   logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
 
   def freeMemory: Long = maxMemory - currentMemory
 
   override def getSize(blockId: String): Long = {
-    synchronized {
+    entries.synchronized {
       entries.get(blockId).size
     }
   }
@@ -60,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def getBytes(blockId: String): Option[ByteBuffer] = {
-    val entry = synchronized {
+    val entry = entries.synchronized {
       entries.get(blockId)
     }
     if (entry == null) {
@@ -73,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def getValues(blockId: String): Option[Iterator[Any]] = {
-    val entry = synchronized {
+    val entry = entries.synchronized {
       entries.get(blockId)
     }
     if (entry == null) {
@@ -87,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def remove(blockId: String) {
-    synchronized {
+    entries.synchronized {
       val entry = entries.get(blockId)
       if (entry != null) {
         entries.remove(blockId)
@@ -101,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def clear() {
-    synchronized {
+    entries.synchronized {
       entries.clear()
     }
     logInfo("MemoryStore cleared")
@@ -122,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * Try to put in a set of values, if we can free up enough space. The value should either be
    * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
    * size must also be passed by the caller.
+   *
+   * Locks on the object putLock to ensure that all the put requests and its associated block
+   * dropping is done by only on thread at a time. Otherwise while one thread is dropping
+   * blocks to free memory for one block, another thread may use up the freed space for
+   * another block.
    */
   private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
-    synchronized {
+    // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+    // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
+    // released, it must be ensured that those to-be-dropped blocks are not double counted for
+    // freeing up more space for another block that needs to be put. Only then the actually dropping
+    // of blocks (and writing to disk if necessary) can proceed in parallel.
+    putLock.synchronized {
       if (ensureFreeSpace(blockId, size)) {
         val entry = new Entry(value, size, deserialized)
-        entries.put(blockId, entry)
+        entries.synchronized { entries.put(blockId, entry) }
         currentMemory += size
         if (deserialized) {
           logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
@@ -157,10 +171,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
    * don't fit into memory that we want to avoid).
    *
-   * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space
-   * might fill up before the caller puts in their new value.)
+   * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+   * Otherwise, the freed space may fill up before the caller puts in their new value.
    */
   private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+
     logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
       space, currentMemory, maxMemory))
 
@@ -169,36 +184,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       return false
     }
 
-    // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
-    // in order to allow parallelism in writing to disk
     if (maxMemory - currentMemory < space) {
       val rddToAdd = getRddId(blockIdToAdd)
       val selectedBlocks = new ArrayBuffer[String]()
       var selectedMemory = 0L
 
-      val iterator = entries.entrySet().iterator()
-      while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
-        val pair = iterator.next()
-        val blockId = pair.getKey
-        if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
-          logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
-            "block from the same RDD")
-          return false
+      // This is synchronized to ensure that the set of entries is not changed
+      // (because of getValue or getBytes) while traversing the iterator, as that
+      // can lead to exceptions.
+      entries.synchronized {
+        val iterator = entries.entrySet().iterator()
+        while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+          val pair = iterator.next()
+          val blockId = pair.getKey
+          if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+            logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+              "block from the same RDD")
+            return false
+          }
+          selectedBlocks += blockId
+          selectedMemory += pair.getValue.size
         }
-        selectedBlocks += blockId
-        selectedMemory += pair.getValue.size
       }
 
       if (maxMemory - (currentMemory - selectedMemory) >= space) {
         logInfo(selectedBlocks.size + " blocks selected for dropping")
         for (blockId <- selectedBlocks) {
-          val entry = entries.get(blockId)
-          val data = if (entry.deserialized) {
-            Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
-          } else {
-            Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+          val entry = entries.synchronized { entries.get(blockId) }
+          // This should never be null as only one thread should be dropping
+          // blocks and removing entries. However the check is still here for
+          // future safety.
+          if (entry != null) {
+            val data = if (entry.deserialized) {
+              Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+            } else {
+              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+            }
+            blockManager.dropFromMemory(blockId, data)
           }
-          blockManager.dropFromMemory(blockId, data)
         }
         return true
       } else {
@@ -209,7 +232,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def contains(blockId: String): Boolean = {
-    synchronized { entries.containsKey(blockId) }
+    entries.synchronized { entries.containsKey(blockId) }
   }
 }
 
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e4a5b8ffdf6953f37d1ef4cb0b49aa09edab2684
--- /dev/null
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -0,0 +1,91 @@
+package spark.storage
+
+import akka.actor._
+
+import spark.KryoSerializer
+import java.util.concurrent.ArrayBlockingQueue
+import util.Random
+
+/**
+ * This class tests the BlockManager and MemoryStore for thread safety and
+ * deadlocks. It spawns a number of producer and consumer threads. Producer
+ * threads continuously pushes blocks into the BlockManager and consumer
+ * threads continuously retrieves the blocks form the BlockManager and tests
+ * whether the block is correct or not.
+ */
+private[spark] object ThreadingTest {
+
+  val numProducers = 5
+  val numBlocksPerProducer = 20000
+
+  private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
+    val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
+
+    override def run() {
+      for (i <- 1 to numBlocksPerProducer) {
+        val blockId = "b-" + id + "-" + i
+        val blockSize = Random.nextInt(1000)
+        val block = (1 to blockSize).map(_ => Random.nextInt())
+        val level = randomLevel()
+        val startTime = System.currentTimeMillis()
+        manager.put(blockId, block.iterator, level, true)
+        println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+        queue.add((blockId, block))
+      }
+      println("Producer thread " + id + " terminated")
+    }
+
+    def randomLevel(): StorageLevel = {
+      math.abs(Random.nextInt()) % 4 match {
+        case 0 => StorageLevel.MEMORY_ONLY
+        case 1 => StorageLevel.MEMORY_ONLY_SER
+        case 2 => StorageLevel.MEMORY_AND_DISK
+        case 3 => StorageLevel.MEMORY_AND_DISK_SER
+      }
+    }
+  }
+
+  private[spark] class ConsumerThread(
+      manager: BlockManager,
+      queue: ArrayBlockingQueue[(String, Seq[Int])]
+    ) extends Thread {
+    var numBlockConsumed = 0
+
+    override def run() {
+      println("Consumer thread started")
+      while(numBlockConsumed < numBlocksPerProducer) {
+        val (blockId, block) = queue.take()
+        val startTime = System.currentTimeMillis()
+        manager.get(blockId) match {
+          case Some(retrievedBlock) =>
+            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
+            println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+          case None =>
+            assert(false, "Block " + blockId + " could not be retrieved")
+        }
+        numBlockConsumed += 1
+      }
+      println("Consumer thread terminated")
+    }
+  }
+
+  def main(args: Array[String]) {
+    System.setProperty("spark.kryoserializer.buffer.mb", "1")
+    val actorSystem = ActorSystem("test")
+    val serializer = new KryoSerializer
+    val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+    val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+    val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
+    val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
+    producers.foreach(_.start)
+    consumers.foreach(_.start)
+    producers.foreach(_.join)
+    consumers.foreach(_.join)
+    blockManager.stop()
+    blockManagerMaster.stop()
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+    println("Everything stopped.")
+    println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+  }
+}