diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index e3ab9442b4821be008046d824b9cb5e00457870b..0f9a1a6ef3b270a3350759a6172812ffad943de9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveMetastoreTypes
 import org.apache.spark.sql.types.StructType
 
 private[orc] object OrcFileOperator extends Logging {
-  def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
+  /**
+   * Retrieves a ORC file reader from a given path.  The path can point to either a directory or a
+   * single ORC file.  If it points to an directory, it picks any non-empty ORC file within that
+   * directory.
+   *
+   * The reader returned by this method is mainly used for two purposes:
+   *
+   * 1. Retrieving file metadata (schema and compression codecs, etc.)
+   * 2. Read the actual file content (in this case, the given path should point to the target file)
+   *
+   * @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code) to an
+   *       ORC file if the file contains zero rows. This is OK for Hive since the schema of the
+   *       table is managed by metastore.  But this becomes a problem when reading ORC files
+   *       directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
+   *       files.  So this method always tries to find a ORC file whose schema is non-empty, and
+   *       create the result reader from that file.  If no such file is found, it returns `None`.
+   *
+   * @todo Needs to consider all files when schema evolution is taken into account.
+   */
+  def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
+    def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
+      reader.getObjectInspector match {
+        case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
+          logInfo(
+            s"ORC file $path has empty schema, it probably contains no rows. " +
+              "Trying to read another ORC file to figure out the schema.")
+          false
+        case _ => true
+      }
+    }
+
     val conf = config.getOrElse(new Configuration)
-    val fspath = new Path(pathStr)
-    val fs = fspath.getFileSystem(conf)
-    val orcFiles = listOrcFiles(pathStr, conf)
-    logDebug(s"Creating ORC Reader from ${orcFiles.head}")
-    // TODO Need to consider all files when schema evolution is taken into account.
-    OrcFile.createReader(fs, orcFiles.head)
+    val fs = {
+      val hdfsPath = new Path(basePath)
+      hdfsPath.getFileSystem(conf)
+    }
+
+    listOrcFiles(basePath, conf).iterator.map { path =>
+      path -> OrcFile.createReader(fs, path)
+    }.collectFirst {
+      case (path, reader) if isWithNonEmptySchema(path, reader) => reader
+    }
   }
 
   def readSchema(path: String, conf: Option[Configuration]): StructType = {
-    val reader = getFileReader(path, conf)
+    val reader = getFileReader(path, conf).getOrElse {
+      throw new AnalysisException(
+        s"Failed to discover schema from ORC files stored in $path. " +
+          "Probably there are either no ORC files or only empty ORC files.")
+    }
     val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
     val schema = readerInspector.getTypeName
     logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
     HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
   }
 
-  def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
-    getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
+  def getObjectInspector(
+      path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
+    getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
   }
 
   def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 300f83d914ea4f13c453578f490349957fa780f9..9dc9fbb78e01f809c1983e70305945c7b7a655eb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -242,26 +242,34 @@ private[orc] case class OrcTableScan(
       nonPartitionKeyAttrs: Seq[(Attribute, Int)],
       mutableRow: MutableRow): Iterator[InternalRow] = {
     val deserializer = new OrcSerde
-    val soi = OrcFileOperator.getObjectInspector(path, Some(conf))
-    val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
-      case (attr, ordinal) =>
-        soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
-    }.unzip
-    val unwrappers = fieldRefs.map(unwrapperFor)
-    // Map each tuple to a row object
-    iterator.map { value =>
-      val raw = deserializer.deserialize(value)
-      var i = 0
-      while (i < fieldRefs.length) {
-        val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
-        if (fieldValue == null) {
-          mutableRow.setNullAt(fieldOrdinals(i))
-        } else {
-          unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+    val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+
+    // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
+    // rows, and thus couldn't give a proper ObjectInspector.  In this case we just return an empty
+    // partition since we know that this file is empty.
+    maybeStructOI.map { soi =>
+      val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+        case (attr, ordinal) =>
+          soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
+      }.unzip
+      val unwrappers = fieldRefs.map(unwrapperFor)
+      // Map each tuple to a row object
+      iterator.map { value =>
+        val raw = deserializer.deserialize(value)
+        var i = 0
+        while (i < fieldRefs.length) {
+          val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
+          if (fieldValue == null) {
+            mutableRow.setNullAt(fieldOrdinals(i))
+          } else {
+            unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+          }
+          i += 1
         }
-        i += 1
+        mutableRow: InternalRow
       }
-      mutableRow: InternalRow
+    }.getOrElse {
+      Iterator.empty
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 267d22c6b5f1e6c5833f9c8fd8edceae09dabb1e..ca131faaeef05d804cb054f899763b92c4f9c8f8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -23,10 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.InternalRow
-import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 
@@ -170,7 +167,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
   test("Default compression options for writing to an ORC file") {
     withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file =>
       assertResult(CompressionKind.ZLIB) {
-        OrcFileOperator.getFileReader(file).getCompression
+        OrcFileOperator.getFileReader(file).get.getCompression
       }
     }
   }
@@ -183,21 +180,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
     conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
     withOrcFile(data) { file =>
       assertResult(CompressionKind.SNAPPY) {
-        OrcFileOperator.getFileReader(file).getCompression
+        OrcFileOperator.getFileReader(file).get.getCompression
       }
     }
 
     conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE")
     withOrcFile(data) { file =>
       assertResult(CompressionKind.NONE) {
-        OrcFileOperator.getFileReader(file).getCompression
+        OrcFileOperator.getFileReader(file).get.getCompression
       }
     }
 
     conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO")
     withOrcFile(data) { file =>
       assertResult(CompressionKind.LZO) {
-        OrcFileOperator.getFileReader(file).getCompression
+        OrcFileOperator.getFileReader(file).get.getCompression
       }
     }
   }
@@ -289,4 +286,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
         List(Row("same", "run_5", 100)))
     }
   }
+
+  test("SPARK-8501: Avoids discovery schema from empty ORC files") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      withTable("empty_orc") {
+        withTempTable("empty", "single") {
+          sqlContext.sql(
+            s"""CREATE TABLE empty_orc(key INT, value STRING)
+               |STORED AS ORC
+               |LOCATION '$path'
+             """.stripMargin)
+
+          val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
+          emptyDF.registerTempTable("empty")
+
+          // This creates 1 empty ORC file with Hive ORC SerDe.  We are using this trick because
+          // Spark SQL ORC data source always avoids write empty ORC files.
+          sqlContext.sql(
+            s"""INSERT INTO TABLE empty_orc
+               |SELECT key, value FROM empty
+             """.stripMargin)
+
+          val errorMessage = intercept[AnalysisException] {
+            sqlContext.read.format("orc").load(path)
+          }.getMessage
+
+          assert(errorMessage.contains("Failed to discover schema from ORC files"))
+
+          val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
+          singleRowDF.registerTempTable("single")
+
+          sqlContext.sql(
+            s"""INSERT INTO TABLE empty_orc
+               |SELECT key, value FROM single
+             """.stripMargin)
+
+          val df = sqlContext.read.format("orc").load(path)
+          assert(df.schema === singleRowDF.schema.asNullable)
+          checkAnswer(df, singleRowDF)
+        }
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index a0cdd0db42d6593cb9002631a69b8149c050bc68..82e08caf464576faa423b5e55ad79c08513c28fc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
     orcTableDir.mkdir()
     import org.apache.spark.sql.hive.test.TestHive.implicits._
 
-    // Originally we were using a 10-row RDD for testing.  However, when default parallelism is
-    // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
-    // which result in empty ORC files.  Unfortunately, ORC doesn't handle empty files properly and
-    // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
-    // for more details.  To workaround this issue before fixing SPARK-8501, we simply increase row
-    // number in this RDD to avoid empty partitions.
     sparkContext
-      .makeRDD(1 to 100)
+      .makeRDD(1 to 10)
       .map(i => OrcData(i, s"part-$i"))
       .toDF()
       .registerTempTable(s"orc_temp_table")
@@ -76,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("create temporary orc table") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 100).map(i => Row(i, s"part-$i")))
+      (1 to 10).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source where intField > 5"),
-      (6 to 100).map(i => Row(i, s"part-$i")))
+      (6 to 10).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
-      (1 to 100).map(i => Row(1, s"part-$i")))
+      (1 to 10).map(i => Row(1, s"part-$i")))
   }
 
   test("create temporary orc table as") {
-    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 100).map(i => Row(i, s"part-$i")))
+      (1 to 10).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
-      (6 to 100).map(i => Row(i, s"part-$i")))
+      (6 to 10).map(i => Row(i, s"part-$i")))
 
     checkAnswer(
       sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
-      (1 to 100).map(i => Row(1, s"part-$i")))
+      (1 to 10).map(i => Row(1, s"part-$i")))
   }
 
   test("appending insert") {
@@ -112,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_source"),
-      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
+      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
         Seq.fill(2)(Row(i, s"part-$i"))
       })
   }
@@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("SELECT * FROM normal_orc_as_source"),
-      (6 to 100).map(i => Row(i, s"part-$i")))
+      (6 to 10).map(i => Row(i, s"part-$i")))
   }
 }