diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index a143ac6aec00f7422ed6399a34c38c47b07023b9..618d5a522be3d2fba6845109e0c9932f480dcd0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -131,9 +131,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
         case _ =>
           val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
-          val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition
+          val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
           logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
-            s"max #files: $maxFileNumInPartition")
+            s"open cost is considered as scanning $openCostInBytes bytes.")
 
           val splitFiles = selectedPartitions.flatMap { partition =>
             partition.files.flatMap { file =>
@@ -151,7 +151,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
           /** Add the given file to the current partition. */
           def addFile(file: PartitionedFile): Unit = {
-            currentSize += file.length
+            currentSize += file.length + openCostInBytes
             currentFiles.append(file)
           }
 
@@ -171,13 +171,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
           // Assign files to partitions using "First Fit Decreasing" (FFD)
           // TODO: consider adding a slop factor here?
           splitFiles.foreach { file =>
-            if (currentSize + file.length > maxSplitBytes ||
-                currentFiles.length >= maxFileNumInPartition) {
+            if (currentSize + file.length > maxSplitBytes) {
               closePartition()
-              addFile(file)
-            } else {
-              addFile(file)
             }
+            addFile(file)
           }
           closePartition()
           partitions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6cc72fba482b71622e7fba8f6442a439aa619e68..a7c0be63fcc3a19d9c2363f6be4a4a9ce7ced50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -510,10 +510,13 @@ object SQLConf {
     doc = "The maximum number of bytes to pack into a single partition when reading files.",
     isPublic = true)
 
-  val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition",
-    defaultValue = Some(32),
-    doc = "The maximum number of files to pack into a single partition when reading files.",
-    isPublic = true)
+  val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
+    defaultValue = Some(4 * 1024 * 1024),
+    doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+      " the same time. This is used when putting multiple files into a partition. It's better to" +
+      " over estimated, then the partitions with small files will be faster than partitions with" +
+      " bigger files (which is scheduled first).",
+    isPublic = false)
 
   val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
     defaultValue = Some(true),
@@ -572,7 +575,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
 
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
-  def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)
+  def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 717a3a80b75cdd66df1f6d0f6514126a531a6b99..4446a6881ccd1e8be2d1ab339ae43c754836e972 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -76,7 +76,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
           "file2" -> 5,
           "file3" -> 5))
 
-    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11",
+      SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
         // 5 byte files should be laid out [(5, 5), (5)]
         assert(partitions.size == 2, "when checking partitions")
@@ -98,11 +99,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
       createTable(
         files = Seq(
           "file1" -> 15,
-          "file2" -> 4))
+          "file2" -> 3))
 
-    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
+      SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
-        // Files should be laid out [(0-5), (5-10, 4)]
+        // Files should be laid out [(0-10), (10-15, 4)]
         assert(partitions.size == 2, "when checking partitions")
         assert(partitions(0).files.size == 1, "when checking partition 1")
         assert(partitions(1).files.size == 2, "when checking partition 2")
@@ -132,8 +134,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
           "file5" -> 1,
           "file6" -> 1))
 
-    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
-        SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
+        SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
         // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
         assert(partitions.size == 4, "when checking partitions")