diff --git a/R/run-tests.sh b/R/run-tests.sh
index 18a1e13bdc65556e564f1ff74f1984330e394704..e82ad0ba2cd0690c7c0b861ba293d810a6ac6a04 100755
--- a/R/run-tests.sh
+++ b/R/run-tests.sh
@@ -23,7 +23,7 @@ FAILED=0
 LOGFILE=$FWDIR/unit-tests.out
 rm -f $LOGFILE
 
-SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
+SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
 FAILED=$((PIPESTATUS[0]||$FAILED))
 
 if [[ $FAILED != 0 ]]; then
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index bf4eaa59ff589cbf04976d3f50ecdc6b7723f3bd..f6e0913a7a0b3174d9bcfda6a8e2f7680e1f3ee4 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -115,8 +115,7 @@ final class UnsafeShuffleExternalSorter {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
     this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.pageSizeBytes = (int) Math.min(
-      PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
-      conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
+      PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
     this.maxRecordSizeBytes = pageSizeBytes - 4;
     this.writeMetrics = writeMetrics;
     initializeForWriting();
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ac3736ac62aaff8b3c88d6f1b05f0607f8e7ae5..0636ae7c8df1a42d396f9dce06bbc82520da11ee 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -642,7 +642,7 @@ public final class BytesToBytesMap {
   private void allocate(int capacity) {
     assert (capacity >= 0);
     // The capacity needs to be divisible by 64 so that our bit set can be sized properly
-    capacity = Math.max((int) Math.min(MAX_CAPACITY, nextPowerOf2(capacity)), 64);
+    capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
     assert (capacity <= MAX_CAPACITY);
     longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
     bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
@@ -770,10 +770,4 @@ public final class BytesToBytesMap {
       timeSpentResizingNs += System.nanoTime() - resizeStartTime;
     }
   }
-
-  /** Returns the next number greater or equal num that is power of 2. */
-  private static long nextPowerOf2(long num) {
-    final long highBit = Long.highestOneBit(num);
-    return (highBit == num) ? num : highBit << 1;
-  }
 }
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4c54ba4bce4082238f6b74127c154df886012eea..5ebbf9b068fd646081e412d82fecf571e07f91f6 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -127,7 +127,6 @@ public final class UnsafeExternalSorter {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
     // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.fileBufferSizeBytes = 32 * 1024;
-    // this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
     this.pageSizeBytes = pageSizeBytes;
     this.writeMetrics = new ShuffleWriteMetrics();
 
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 08bab4bf2739f7f721b3f854696c8616c918d790..8ff154fb5e334f9451eed139aa7aac77f55aab73 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -249,6 +249,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     Utils.byteStringAsBytes(get(key, defaultValue))
   }
 
+  /**
+   * Get a size parameter as bytes, falling back to a default if not set.
+   */
+  def getSizeAsBytes(key: String, defaultValue: Long): Long = {
+    Utils.byteStringAsBytes(get(key, defaultValue + "B"))
+  }
+
   /**
    * Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
    * suffix is provided then Kibibytes are assumed.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0c0705325b169d15489c3fdbe036a38901143dcf..56626864369003ce9088a3b9b605ee6cb75893f1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -629,7 +629,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * [[org.apache.spark.SparkContext.setLocalProperty]].
    */
   def getLocalProperty(key: String): String =
-    Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+    Option(localProperties.get).map(_.getProperty(key)).orNull
 
   /** Set a human readable description of the current job. */
   def setJobDescription(value: String) {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index adfece4d6e7c0c5f4a2648433a1268edb3b60c48..a796e72850191d0cef924a44716f8db6160da34c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -324,7 +324,7 @@ object SparkEnv extends Logging {
     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 
-    val shuffleMemoryManager = new ShuffleMemoryManager(conf)
+    val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
 
     val blockTransferService =
       conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index e3d229cc9982147402f558b8bb3eef68fe889eea..8c3a72644c38a9f2d6f380a78770dbad8b6b0712 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,6 +19,9 @@ package org.apache.spark.shuffle
 
 import scala.collection.mutable
 
+import com.google.common.annotations.VisibleForTesting
+
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
 
 /**
@@ -34,11 +37,19 @@ import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
  * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
  * this set changes. This is all done by synchronizing access on "this" to mutate state and using
  * wait() and notifyAll() to signal changes.
+ *
+ * Use `ShuffleMemoryManager.create()` factory method to create a new instance.
+ *
+ * @param maxMemory total amount of memory available for execution, in bytes.
+ * @param pageSizeBytes number of bytes for each page, by default.
  */
-private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
-  private val taskMemory = new mutable.HashMap[Long, Long]()  // taskAttemptId -> memory bytes
+private[spark]
+class ShuffleMemoryManager protected (
+    val maxMemory: Long,
+    val pageSizeBytes: Long)
+  extends Logging {
 
-  def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
+  private val taskMemory = new mutable.HashMap[Long, Long]()  // taskAttemptId -> memory bytes
 
   private def currentTaskAttemptId(): Long = {
     // In case this is called on the driver, return an invalid task attempt id.
@@ -124,15 +135,49 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
   }
 }
 
+
 private[spark] object ShuffleMemoryManager {
+
+  def create(conf: SparkConf, numCores: Int): ShuffleMemoryManager = {
+    val maxMemory = ShuffleMemoryManager.getMaxMemory(conf)
+    val pageSize = ShuffleMemoryManager.getPageSize(conf, maxMemory, numCores)
+    new ShuffleMemoryManager(maxMemory, pageSize)
+  }
+
+  def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
+    new ShuffleMemoryManager(maxMemory, pageSizeBytes)
+  }
+
+  @VisibleForTesting
+  def createForTesting(maxMemory: Long): ShuffleMemoryManager = {
+    new ShuffleMemoryManager(maxMemory, 4 * 1024 * 1024)
+  }
+
   /**
    * Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
    * of the memory pool and a safety factor since collections can sometimes grow bigger than
    * the size we target before we estimate their sizes again.
    */
-  def getMaxMemory(conf: SparkConf): Long = {
+  private def getMaxMemory(conf: SparkConf): Long = {
     val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
     val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
     (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
+
+  /**
+   * Sets the page size, in bytes.
+   *
+   * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
+   * by looking at the number of cores available to the process, and the total amount of memory,
+   * and then divide it by a factor of safety.
+   */
+  private def getPageSize(conf: SparkConf, maxMemory: Long, numCores: Int): Long = {
+    val minPageSize = 1L * 1024 * 1024   // 1MB
+    val maxPageSize = 64L * minPageSize  // 64MB
+    val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
+    val safetyFactor = 8
+    val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
+    val default = math.min(maxPageSize, math.max(minPageSize, size))
+    conf.getSizeAsBytes("spark.buffer.pageSize", default)
+  }
 }
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 98c32bbc298d75294e7bcec8e652c0fd1c54ff52..c68354ba49a46e104b50d9dbd0b2c036ead04859 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -115,6 +115,7 @@ public class UnsafeShuffleWriterSuite {
     taskMetrics = new TaskMetrics();
 
     when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
+    when(shuffleMemoryManager.pageSizeBytes()).thenReturn(128L * 1024 * 1024);
 
     when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
     when(blockManager.getDiskWriter(
@@ -549,14 +550,14 @@ public class UnsafeShuffleWriterSuite {
     final long recordLengthBytes = 8;
     final long pageSizeBytes = 256;
     final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
-    final SparkConf conf = new SparkConf().set("spark.buffer.pageSize", pageSizeBytes + "b");
+    when(shuffleMemoryManager.pageSizeBytes()).thenReturn(pageSizeBytes);
     final UnsafeShuffleWriter<Object, Object> writer =
       new UnsafeShuffleWriter<Object, Object>(
         blockManager,
         shuffleBlockResolver,
         taskMemoryManager,
         shuffleMemoryManager,
-        new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
+        new UnsafeShuffleHandle<>(0, 1, shuffleDep),
         0, // map id
         taskContext,
         conf);
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 3c5003380162fbdc19bc06a16e6cd3af2e72a5c2..0b11562980b8e2754f0696748de498aa87c2a7ee 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -48,7 +48,7 @@ public abstract class AbstractBytesToBytesMapSuite {
 
   @Before
   public void setup() {
-    shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+    shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, PAGE_SIZE_BYTES);
     taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
     // Mocked memory manager for tests that check the maximum array size, since actually allocating
     // such large arrays will cause us to run out of memory in our tests.
@@ -441,7 +441,7 @@ public abstract class AbstractBytesToBytesMapSuite {
 
   @Test
   public void failureToAllocateFirstPage() {
-    shuffleMemoryManager = new ShuffleMemoryManager(1024);
+    shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024);
     BytesToBytesMap map =
       new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, PAGE_SIZE_BYTES);
     try {
@@ -461,7 +461,7 @@ public abstract class AbstractBytesToBytesMapSuite {
 
   @Test
   public void failureToGrow() {
-    shuffleMemoryManager = new ShuffleMemoryManager(1024 * 10);
+    shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024 * 10);
     BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, 1024);
     try {
       boolean success = true;
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index f5300373d87ea1b6f2d99029c478072d06827a00..83049b8a21fcf41dcd92b151b0663522a1ed78c2 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -102,7 +102,7 @@ public class UnsafeExternalSorterSuite {
     MockitoAnnotations.initMocks(this);
     sparkConf = new SparkConf();
     tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
-    shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+    shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, pageSizeBytes);
     spillFilesCreated.clear();
     taskContext = mock(TaskContext.class);
     when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
@@ -237,7 +237,7 @@ public class UnsafeExternalSorterSuite {
 
   @Test
   public void spillingOccursInResponseToMemoryPressure() throws Exception {
-    shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
+    shuffleMemoryManager = ShuffleMemoryManager.create(pageSizeBytes * 2, pageSizeBytes);
     final UnsafeExternalSorter sorter = newSorter();
     final int numRecords = (int) pageSizeBytes / 4;
     for (int i = 0; i <= numRecords; i++) {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index f495b6a0379581def58622bd3d0cfdfba9b8c77a..6d45b1a101be63881cd0d2420787e9e97d4986f9 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,7 @@ import org.mockito.Mockito._
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
 
 class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
 
@@ -50,7 +50,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
   }
 
   test("single task requesting memory") {
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     assert(manager.tryToAcquire(100L) === 100L)
     assert(manager.tryToAcquire(400L) === 400L)
@@ -72,7 +72,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     // Two threads request 500 bytes first, wait for each other to get it, and then request
     // 500 more; we should immediately return 0 as both are now at 1 / N
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Result1 = -1L
@@ -124,7 +124,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     // Two tasks request 250 bytes first, wait for each other to get it, and then request
     // 500 more; we should only grant 250 bytes to each of them on this second request
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Result1 = -1L
@@ -176,7 +176,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     // for a bit and releases 250 bytes, which should then be granted to t2. Further requests
     // by t2 will return false right away because it now has 1 / 2N of the memory.
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Requested = false
@@ -241,7 +241,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
     // for a bit and releases all its memory. t2 should now be able to grab all the memory.
 
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
 
     class State {
       var t1Requested = false
@@ -307,7 +307,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
   }
 
   test("tasks should not be granted a negative size") {
-    val manager = new ShuffleMemoryManager(1000L)
+    val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
     manager.tryToAcquire(700L)
 
     val latch = new CountDownLatch(1)
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 60be85e53e2aa6211599547519d9307be4071027..cd4c55f79f18c5ce48dc6adb2e968ddf3fa21a34 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -54,7 +54,6 @@ def launch_gateway():
         if os.environ.get("SPARK_TESTING"):
             submit_args = ' '.join([
                 "--conf spark.ui.enabled=false",
-                "--conf spark.buffer.pageSize=4mb",
                 submit_args
             ])
         command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index b9d44aace1009b01117e5cda473fe84258770e6e..4d5e98a3e90c8eb45fe6e779979fff4c4b2cb3b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -342,7 +342,7 @@ class TungstenAggregationIterator(
     TaskContext.get.taskMemoryManager(),
     SparkEnv.get.shuffleMemoryManager,
     1024 * 16, // initial capacity
-    SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"),
+    SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
     false // disable tracking of performance metrics
   )
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 3f257ecdd156cbe203ebd792856b8db80491e345..953abf409f220af0574c08d3d1d6ad7b84c9482c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -282,17 +282,15 @@ private[joins] final class UnsafeHashedRelation(
     // This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
     val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
 
+    val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
+      .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
+
     // Dummy shuffle memory manager which always grants all memory allocation requests.
     // We use this because it doesn't make sense count shared broadcast variables' memory usage
     // towards individual tasks' quotas. In the future, we should devise a better way of handling
     // this.
-    val shuffleMemoryManager = new ShuffleMemoryManager(new SparkConf()) {
-      override def tryToAcquire(numBytes: Long): Long = numBytes
-      override def release(numBytes: Long): Unit = {}
-    }
-
-    val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
-      .getSizeAsBytes("spark.buffer.pageSize", "64m")
+    val shuffleMemoryManager =
+      ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes)
 
     binaryMap = new BytesToBytesMap(
       taskMemoryManager,
@@ -306,11 +304,11 @@ private[joins] final class UnsafeHashedRelation(
     while (i < nKeys) {
       val keySize = in.readInt()
       val valuesSize = in.readInt()
-      if (keySize > keyBuffer.size) {
+      if (keySize > keyBuffer.length) {
         keyBuffer = new Array[Byte](keySize)
       }
       in.readFully(keyBuffer, 0, keySize)
-      if (valuesSize > valuesBuffer.size) {
+      if (valuesSize > valuesBuffer.length) {
         valuesBuffer = new Array[Byte](valuesSize)
       }
       in.readFully(valuesBuffer, 0, valuesSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 7f69cdb08aa7898f190e362346acc64c0943022a..e316930470127eea2873fa54d77c9dbb7e410395 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
 import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
@@ -122,7 +122,7 @@ case class TungstenSort(
   protected override def doExecute(): RDD[InternalRow] = {
     val schema = child.schema
     val childOutput = child.output
-    val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
+    val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
 
     /**
      * Set up the sorter in each partition before computing the parent partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
similarity index 100%
rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index 53de2d0f0771fe166fe9d8a1e45d358ef968f1ee..48c3938ff87ba99d53640e4e428d32bb8214e5c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -22,7 +22,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
 /**
  * A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
  */
-class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue) {
+class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) {
   private var oom = false
 
   override def tryToAcquire(numBytes: Long): Long = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 167086db5bfe2749033cc93c3e28a73be626c987..296cc5c5e0b04f5804eeed3a9a5fe6560ac74039 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -52,7 +52,6 @@ object TestHive
         .set("spark.sql.test", "")
         .set("spark.sql.hive.metastore.barrierPrefixes",
           "org.apache.spark.sql.hive.execution.PairSerDe")
-        .set("spark.buffer.pageSize", "4m")
         // SPARK-8910
         .set("spark.ui.enabled", "false")))
 
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index cf693d01a4f5bba1bb5fc1d4bdf9677ae8675462..70b81ce015ddc3af435fa126705225f0c937bb17 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -25,6 +25,12 @@ public class ByteArrayMethods {
     // Private constructor, since this class only contains static methods.
   }
 
+  /** Returns the next number greater or equal num that is power of 2. */
+  public static long nextPowerOf2(long num) {
+    final long highBit = Long.highestOneBit(num);
+    return (highBit == num) ? num : highBit << 1;
+  }
+
   public static int roundNumberOfBytesToNearestWord(int numBytes) {
     int remainder = numBytes & 0x07;  // This is equivalent to `numBytes % 8`
     if (remainder == 0) {