diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
new file mode 100644
index 0000000000000000000000000000000000000000..62df9657a6ac6affe1ef705a42ab8e3d8b2655d8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+
+case class OfferState(workOffer: WorkerOffer, var cores: Int) {
+  // Build a list of tasks to assign to each worker.
+  val tasks = new ArrayBuffer[TaskDescription](cores)
+}
+
+abstract class TaskAssigner(conf: SparkConf) {
+  var offer: Seq[OfferState] = _
+  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
+
+  // The final assigned offer returned to TaskScheduler.
+  def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // construct the assigner by the workoffer.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+    offer = workOffer.map(o => OfferState(o, o.cores))
+  }
+
+  // Invoked in each round of Taskset assignment to initialize the internal structure.
+  def init(): Unit
+
+  // Indicating whether there is offer available to be used by one round of Taskset assignment.
+  def hasNext(): Boolean
+
+  // Next available offer returned to one round of Taskset assignment.
+  def getNext(): OfferState
+
+  // Called by the TaskScheduler to indicate whether the current offer is accepted
+  // In order to decide whether the current is valid for the next offering.
+  def taskAssigned(assigned: Boolean): Unit
+
+  // Release internally maintained resources. Subclass is responsible to
+  // release its own private resources.
+  def reset: Unit = {
+    offer = null
+  }
+}
+
+class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
+  var i = 0
+  override def construct(workOffer: Seq[WorkerOffer]): Unit = {
+    offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
+  }
+  override def init(): Unit = {
+    i = 0
+  }
+  override def hasNext: Boolean = {
+    i < offer.size
+  }
+  override def getNext(): OfferState = {
+    offer(i)
+  }
+  override def taskAssigned(assigned: Boolean): Unit = {
+    i += 1
+  }
+  override def reset: Unit = {
+    super.reset
+    i = 0
+  }
+}
+
+class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
+  var maxHeap: PriorityQueue[OfferState] = _
+  var current: OfferState = _
+
+  override def construct(workOffer: Seq[WorkerOffer]): Unit = {
+    offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
+  }
+  implicit val ord: Ordering[OfferState] = new Ordering[OfferState] {
+    def compare(x: OfferState, y: OfferState): Int = {
+      return Ordering[Int].compare(x.cores, y.cores)
+    }
+  }
+  def init(): Unit = {
+    maxHeap = new PriorityQueue[OfferState]()
+    offer.filter(_.cores >= CPUS_PER_TASK).foreach(maxHeap.enqueue(_))
+  }
+  override def hasNext: Boolean = {
+    maxHeap.size > 0
+  }
+  override def getNext(): OfferState = {
+    current = maxHeap.dequeue()
+    current
+  }
+
+  override def taskAssigned(assigned: Boolean): Unit = {
+    if (current.cores >= CPUS_PER_TASK && assigned) {
+      maxHeap.enqueue(current)
+    }
+  }
+  override def reset: Unit = {
+    super.reset
+    maxHeap = null
+    current = null
+  }
+}
+
+class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
+
+  var sorted: Seq[OfferState] = _
+  var i = 0
+  var current: OfferState = _
+
+  override def init(): Unit = {
+    i = 0
+    sorted = offer.filter(_.cores >= CPUS_PER_TASK).sortBy(_.cores)
+  }
+
+  override def hasNext: Boolean = {
+    i < sorted.size
+  }
+
+  override def getNext(): OfferState = {
+    current = sorted(i)
+    current
+  }
+
+  def taskAssigned(assigned: Boolean): Unit = {
+    if (current.cores < CPUS_PER_TASK || !assigned) {
+      i += 1
+    }
+  }
+
+  override def reset: Unit = {
+    super.reset
+    sorted = null
+    current = null
+    i = 0
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 3e3f1ad031e663deaa3bddf0624b2e69d0f806e8..fb732ea8e5a3b6df86ebf16ac3000f91410ff231 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -22,9 +22,7 @@ import java.util.{Timer, TimerTask}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.collection.Set
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
@@ -61,6 +59,21 @@ private[spark] class TaskSchedulerImpl(
 
   val conf = sc.conf
 
+  val DEFAULT_TASK_ASSIGNER = classOf[RoundRobinAssigner].getName
+  lazy val taskAssigner: TaskAssigner = {
+    val className = conf.get("spark.task.assigner", DEFAULT_TASK_ASSIGNER)
+    try {
+      logInfo(s"""constructing assigner as $className""")
+      val ctor = Utils.classForName(className).getConstructor(classOf[SparkConf])
+      ctor.newInstance(conf).asInstanceOf[TaskAssigner]
+    } catch {
+      case _: Throwable =>
+        logWarning(
+          s"""$className cannot be constructed fallback to default
+             | $DEFAULT_TASK_ASSIGNER""".stripMargin)
+        new RoundRobinAssigner(conf)
+    }
+  }
   // How often to check for speculative tasks
   val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
 
@@ -250,24 +263,26 @@ private[spark] class TaskSchedulerImpl(
   private def resourceOfferSingleTaskSet(
       taskSet: TaskSetManager,
       maxLocality: TaskLocality,
-      shuffledOffers: Seq[WorkerOffer],
-      availableCpus: Array[Int],
-      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
+      taskAssigner: TaskAssigner) : Boolean = {
     var launchedTask = false
-    for (i <- 0 until shuffledOffers.size) {
-      val execId = shuffledOffers(i).executorId
-      val host = shuffledOffers(i).host
-      if (availableCpus(i) >= CPUS_PER_TASK) {
+    taskAssigner.init()
+    while(taskAssigner.hasNext()) {
+      var assigned = false
+      val current = taskAssigner.getNext()
+      val execId = current.workOffer.executorId
+      val host = current.workOffer.host
+      if (current.cores >= CPUS_PER_TASK) {
         try {
           for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-            tasks(i) += task
+            current.tasks += task
             val tid = task.taskId
             taskIdToTaskSetManager(tid) = taskSet
             taskIdToExecutorId(tid) = execId
             executorIdToTaskCount(execId) += 1
-            availableCpus(i) -= CPUS_PER_TASK
-            assert(availableCpus(i) >= 0)
+            current.cores = current.cores - CPUS_PER_TASK
+            assert(current.cores >= 0)
             launchedTask = true
+            assigned = true
           }
         } catch {
           case e: TaskNotSerializableException =>
@@ -277,8 +292,10 @@ private[spark] class TaskSchedulerImpl(
             return launchedTask
         }
       }
+      taskAssigner.taskAssigned(assigned)
     }
     return launchedTask
+
   }
 
   /**
@@ -305,12 +322,8 @@ private[spark] class TaskSchedulerImpl(
         hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
       }
     }
+    taskAssigner.construct(offers)
 
-    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
-    val shuffledOffers = Random.shuffle(offers)
-    // Build a list of tasks to assign to each worker.
-    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
-    val availableCpus = shuffledOffers.map(o => o.cores).toArray
     val sortedTaskSets = rootPool.getSortedTaskSetQueue
     for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -329,7 +342,7 @@ private[spark] class TaskSchedulerImpl(
       for (currentMaxLocality <- taskSet.myLocalityLevels) {
         do {
           launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
+            taskSet, currentMaxLocality, taskAssigner)
           launchedAnyTask |= launchedTaskAtCurrentMaxLocality
         } while (launchedTaskAtCurrentMaxLocality)
       }
@@ -337,10 +350,12 @@ private[spark] class TaskSchedulerImpl(
         taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
       }
     }
-
+    val tasks = taskAssigner.tasks
+    taskAssigner.reset
     if (tasks.size > 0) {
       hasLaunchedTask = true
     }
+
     return tasks
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index f5f1947661d9afca253b706da802ebbcb75b257f..2584f85bc553a10e2c6745faccd553ba9723d64c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -109,6 +109,72 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(!failedTaskSet)
   }
 
+  test("Scheduler balance the assignment to the worker with more free cores") {
+    val taskScheduler = setupScheduler(("spark.task.assigner", classOf[BalancedAssigner].getName))
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
+      new WorkerOffer("executor1", "host1", 4))
+    val selectedExecutorIds = {
+      val taskSet = FakeTask.createTaskSet(2)
+      taskScheduler.submitTasks(taskSet)
+      val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+      assert(2 === taskDescriptions.length)
+      taskDescriptions.map(_.executorId)
+    }
+    val count = selectedExecutorIds.count(_ == workerOffers(1).executorId)
+    assert(count == 2)
+    assert(!failedTaskSet)
+  }
+
+  test("Scheduler balance the assignment across workers with same free cores") {
+    val taskScheduler = setupScheduler(("spark.task.assigner", classOf[BalancedAssigner].getName))
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
+      new WorkerOffer("executor1", "host1", 2))
+    val selectedExecutorIds = {
+      val taskSet = FakeTask.createTaskSet(2)
+      taskScheduler.submitTasks(taskSet)
+      val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+      assert(2 === taskDescriptions.length)
+      taskDescriptions.map(_.executorId)
+    }
+    val count = selectedExecutorIds.count(_ == workerOffers(1).executorId)
+    assert(count == 1)
+    assert(!failedTaskSet)
+  }
+
+  test("Scheduler packs the assignment to workers with less free cores") {
+    val taskScheduler = setupScheduler(("spark.task.assigner", classOf[PackedAssigner].getName))
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
+      new WorkerOffer("executor1", "host1", 4))
+    val selectedExecutorIds = {
+      val taskSet = FakeTask.createTaskSet(2)
+      taskScheduler.submitTasks(taskSet)
+      val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+      assert(2 === taskDescriptions.length)
+      taskDescriptions.map(_.executorId)
+    }
+    val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
+    assert(count == 2)
+    assert(!failedTaskSet)
+  }
+
+  test("Scheduler keeps packing the assignment to the same worker") {
+    val taskScheduler = setupScheduler(("spark.task.assigner", classOf[PackedAssigner].getName))
+    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4),
+      new WorkerOffer("executor1", "host1", 4))
+    val selectedExecutorIds = {
+      val taskSet = FakeTask.createTaskSet(4)
+      taskScheduler.submitTasks(taskSet)
+      val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+      assert(4 === taskDescriptions.length)
+      taskDescriptions.map(_.executorId)
+    }
+
+    val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
+    assert(count == 4)
+    assert(!failedTaskSet)
+  }
+
+
   test("Scheduler correctly accounts for multiple CPUs per task") {
     val taskCpus = 2
     val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
@@ -408,4 +474,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(thirdTaskDescs.size === 0)
     assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
   }
+
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 373e22d71a87294ff81c1da6f0c860b69c061f9b..6f3fbeb76cc2461371635d71c5a6007c8ce37b5d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1334,6 +1334,17 @@ Apart from these, the following properties are also available, and may be useful
     Should be greater than or equal to 1. Number of allowed retries = this value - 1.
   </td>
 </tr>
+<tr>
+  <td><code>spark.task.assigner</code></td>
+  <td>org.apache.spark.scheduler.RoundRobinAssigner</td>
+  <td>
+    The strategy of how to allocate tasks among workers with free cores.
+    By default, round robin with randomness is used.
+    org.apache.spark.scheduler.BalancedAssigner tries to balance the task across all workers (allocating tasks to
+    workers with most free cores). org.apache.spark.scheduler.PackedAssigner tries to allocate tasks to workers
+    with the least free cores, which may help releasing the resources when dynamic allocation is enabled.
+  </td>
+</tr>
 </table>
 
 #### Dynamic Allocation