diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 8920e2a76a27f9361ceb0c5718c8edf7048c6cf5..577ca928b43b6a456a1cee74069626152a7239ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -72,17 +72,12 @@ case class HiveTableScan(
   }
 
   private def addColumnMetadataToConf(hiveConf: HiveConf) {
-    // Specifies IDs and internal names of columns to be scanned.
-    val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
-    val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
-
-    if (attributes.size == relation.output.size) {
-      // SQLContext#pruneFilterProject guarantees no duplicated value in `attributes`
-      ColumnProjectionUtils.setFullyReadColumns(hiveConf)
-    } else {
-      ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
-    }
+    // Specifies needed column IDs for those non-partitioning columns.
+    val neededColumnIDs =
+      attributes.map(a =>
+        relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0)
 
+    ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
     ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
 
     // Specifies types and object inspectors of columns to be scanned.
@@ -99,7 +94,7 @@ case class HiveTableScan(
       .mkString(",")
 
     hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
-    hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(","))
   }
 
   addColumnMetadataToConf(context.hiveconf)