From 54794113a6a906b0f9c6bfb9da322e18e007214c Mon Sep 17 00:00:00 2001
From: Sameer Agarwal <sameer@databricks.com>
Date: Fri, 18 Mar 2016 14:04:42 -0700
Subject: [PATCH] [SPARK-13989] [SQL] Remove non-vectorized/unsafe-row parquet
 record reader

## What changes were proposed in this pull request?

This PR cleans up the new parquet record reader with the following changes:

1. Removes the non-vectorized parquet reader code from `UnsafeRowParquetRecordReader`.
2. Removes the non-vectorized column reader code from `ColumnReader`.
3. Renames `UnsafeRowParquetRecordReader` to `VectorizedParquetRecordReader` and `ColumnReader` to `VectorizedColumnReader`
4. Deprecate `PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED`

## How was this patch tested?

Refactoring only; Existing tests should reveal any problems.

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11799 from sameeragarwal/vectorized-parquet.
---
 ...ava => VectorizedParquetRecordReader.java} | 319 ++----------------
 .../sql/execution/command/commands.scala      |   9 +
 .../datasources/SqlNewHadoopRDD.scala         |  20 +-
 .../apache/spark/sql/internal/SQLConf.scala   |   6 +-
 .../parquet/ParquetEncodingSuite.scala        |   4 +-
 .../parquet/ParquetFilterSuite.scala          |   6 +-
 .../datasources/parquet/ParquetIOSuite.scala  |  14 +-
 .../parquet/ParquetReadBenchmark.scala        |  61 +---
 8 files changed, 75 insertions(+), 364 deletions(-)
 rename sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/{UnsafeRowParquetRecordReader.java => VectorizedParquetRecordReader.java} (69%)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
similarity index 69%
rename from sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
rename to sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 7234726633..0f00f56a3a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -18,13 +18,11 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -37,22 +35,17 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
 import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.unsafe.types.UTF8String;
 
-import static org.apache.parquet.column.ValuesType.*;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
 
 /**
- * A specialized RecordReader that reads into UnsafeRows directly using the Parquet column APIs.
- *
- * This is somewhat based on parquet-mr's ColumnReader.
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
+ * Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader.
  *
  * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. Schema mismatch.
  * All of these can be handled efficiently and easily with codegen.
@@ -61,29 +54,18 @@ import static org.apache.parquet.column.ValuesType.*;
  * enabled, this class returns ColumnarBatches which offers significant performance gains.
  * TODO: make this always return ColumnarBatches.
  */
-public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
+public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
   /**
-   * Batch of unsafe rows that we assemble and the current index we've returned. Every time this
+   * Batch of rows that we assemble and the current index we've returned. Every time this
    * batch is used up (batchIdx == numBatched), we populated the batch.
    */
-  private UnsafeRow[] rows = new UnsafeRow[64];
   private int batchIdx = 0;
   private int numBatched = 0;
 
-  /**
-   * Used to write variable length columns. Same length as `rows`.
-   */
-  private UnsafeRowWriter[] rowWriters = null;
-  /**
-   * True if the row contains variable length fields.
-   */
-  private boolean containsVarLenFields;
-
   /**
    * For each request column, the reader to read this column.
-   * columnsReaders[i] populated the UnsafeRow's attribute at i.
    */
-  private ColumnReader[] columnReaders;
+  private VectorizedColumnReader[] columnReaders;
 
   /**
    * The number of rows that have been returned.
@@ -95,17 +77,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
    */
   private long totalCountLoadedSoFar = 0;
 
-  /**
-   * For each column, the annotated original type.
-   */
-  private OriginalType[] originalTypes;
-
-  /**
-   * The default size for varlen columns. The row grows as necessary to accommodate the
-   * largest column.
-   */
-  private static final int DEFAULT_VAR_LEN_SIZE = 32;
-
   /**
    * columnBatch object that is used for batch decoding. This is created on first use and triggers
    * batched decoding. It is not valid to interleave calls to the batched interface with the row
@@ -176,14 +147,12 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
 
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
+    resultBatch();
+
     if (returnColumnarBatch) return nextBatch();
 
     if (batchIdx >= numBatched) {
-      if (vectorizedDecode()) {
-        if (!nextBatch()) return false;
-      } else {
-        if (!loadBatch()) return false;
-      }
+      if (!nextBatch()) return false;
     }
     ++batchIdx;
     return true;
@@ -192,12 +161,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
   @Override
   public Object getCurrentValue() throws IOException, InterruptedException {
     if (returnColumnarBatch) return columnarBatch;
-
-    if (vectorizedDecode()) {
-      return columnarBatch.getRow(batchIdx - 1);
-    } else {
-      return rows[batchIdx - 1];
-    }
+    return columnarBatch.getRow(batchIdx - 1);
   }
 
   @Override
@@ -225,7 +189,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
    * Can be called before any rows are returned to enable returning columnar batches directly.
    */
   public void enableReturningBatches() {
-    assert(vectorizedDecode());
     returnColumnarBatch = true;
   }
 
@@ -233,12 +196,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
    * Advances to the next batch of rows. Returns false if there are no more.
    */
   public boolean nextBatch() throws IOException {
-    assert(vectorizedDecode());
     columnarBatch.reset();
     if (rowsReturned >= totalRowCount) return false;
     checkEndOfRowGroup();
 
-    int num = (int)Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
+    int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
     for (int i = 0; i < columnReaders.length; ++i) {
       columnReaders[i].readBatch(num, columnarBatch.column(i));
     }
@@ -249,17 +211,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     return true;
   }
 
-  /**
-   * Returns true if we are doing a vectorized decode.
-   */
-  private boolean vectorizedDecode() { return columnarBatch != null; }
-
   private void initializeInternal() throws IOException {
     /**
      * Check that the requested schema is supported.
      */
-    int numVarLenFields = 0;
-    originalTypes = new OriginalType[requestedSchema.getFieldCount()];
+    OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
       Type t = requestedSchema.getFields().get(i);
       if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
@@ -286,197 +242,13 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       if (!fd.equals(requestedSchema.getColumns().get(i))) {
         throw new IOException("Schema evolution not supported.");
       }
-
-      if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
-        ++numVarLenFields;
-      }
-    }
-
-    /**
-     * Initialize rows and rowWriters. These objects are reused across all rows in the relation.
-     */
-    containsVarLenFields = numVarLenFields > 0;
-    rowWriters = new UnsafeRowWriter[rows.length];
-
-    for (int i = 0; i < rows.length; ++i) {
-      rows[i] = new UnsafeRow(requestedSchema.getFieldCount());
-      BufferHolder holder = new BufferHolder(rows[i], numVarLenFields * DEFAULT_VAR_LEN_SIZE);
-      rowWriters[i] = new UnsafeRowWriter(holder, requestedSchema.getFieldCount());
     }
   }
 
   /**
-   * Decodes a batch of values into `rows`. This function is the hot path.
-   */
-  private boolean loadBatch() throws IOException {
-    // no more records left
-    if (rowsReturned >= totalRowCount) { return false; }
-    checkEndOfRowGroup();
-
-    int num = (int)Math.min(rows.length, totalCountLoadedSoFar - rowsReturned);
-    rowsReturned += num;
-
-    if (containsVarLenFields) {
-      for (int i = 0; i < rowWriters.length; ++i) {
-        rowWriters[i].holder().reset();
-      }
-    }
-
-    for (int i = 0; i < columnReaders.length; ++i) {
-      switch (columnReaders[i].descriptor.getType()) {
-        case BOOLEAN:
-          decodeBooleanBatch(i, num);
-          break;
-        case INT32:
-          if (originalTypes[i] == OriginalType.DECIMAL) {
-            decodeIntAsDecimalBatch(i, num);
-          } else {
-            decodeIntBatch(i, num);
-          }
-          break;
-        case INT64:
-          Preconditions.checkState(originalTypes[i] == null
-              || originalTypes[i] == OriginalType.DECIMAL,
-              "Unexpected original type: " + originalTypes[i]);
-          decodeLongBatch(i, num);
-          break;
-        case FLOAT:
-          decodeFloatBatch(i, num);
-          break;
-        case DOUBLE:
-          decodeDoubleBatch(i, num);
-          break;
-        case BINARY:
-          decodeBinaryBatch(i, num);
-          break;
-        case FIXED_LEN_BYTE_ARRAY:
-          Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
-              "Unexpected original type: " + originalTypes[i]);
-          decodeFixedLenArrayAsDecimalBatch(i, num);
-          break;
-        case INT96:
-          throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
-      }
-    }
-
-    numBatched = num;
-    batchIdx = 0;
-
-    // Update the total row lengths if the schema contained variable length. We did not maintain
-    // this as we populated the columns.
-    if (containsVarLenFields) {
-      for (int i = 0; i < numBatched; ++i) {
-        rows[i].setTotalSize(rowWriters[i].holder().totalSize());
-      }
-    }
-
-    return true;
-  }
-
-  private void decodeBooleanBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setBoolean(col, columnReaders[col].nextBoolean());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeIntBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setInt(col, columnReaders[col].nextInt());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeIntAsDecimalBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        // Since this is stored as an INT, it is always a compact decimal. Just set it as a long.
-        rows[n].setLong(col, columnReaders[col].nextInt());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeLongBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setLong(col, columnReaders[col].nextLong());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeFloatBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setFloat(col, columnReaders[col].nextFloat());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeDoubleBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setDouble(col, columnReaders[col].nextDouble());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeBinaryBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        ByteBuffer bytes = columnReaders[col].nextBinary().toByteBuffer();
-        int len = bytes.remaining();
-        if (originalTypes[col] == OriginalType.UTF8) {
-          UTF8String str =
-              UTF8String.fromBytes(bytes.array(), bytes.arrayOffset() + bytes.position(), len);
-          rowWriters[n].write(col, str);
-        } else {
-          rowWriters[n].write(col, bytes.array(), bytes.arrayOffset() + bytes.position(), len);
-        }
-        rows[n].setNotNullAt(col);
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeFixedLenArrayAsDecimalBatch(int col, int num) throws IOException {
-    PrimitiveType type = requestedSchema.getFields().get(col).asPrimitiveType();
-    int precision = type.getDecimalMetadata().getPrecision();
-    int scale = type.getDecimalMetadata().getScale();
-    Preconditions.checkState(precision <= Decimal.MAX_LONG_DIGITS(),
-        "Unsupported precision.");
-
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        Binary v = columnReaders[col].nextBinary();
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.
-        long unscaled = CatalystRowConverter.binaryToUnscaledLong(v);
-        rows[n].setDecimal(col, Decimal.apply(unscaled, precision, scale), precision);
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  /**
-   *
    * Decoder to return values from a single column.
    */
-  private final class ColumnReader {
+  private class VectorizedColumnReader {
     /**
      * Total number of values read.
      */
@@ -527,7 +299,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     private final PageReader pageReader;
     private final ColumnDescriptor descriptor;
 
-    public ColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
+    public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
         throws IOException {
       this.descriptor = descriptor;
       this.pageReader = pageReader;
@@ -634,7 +406,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
         int num = Math.min(total, leftInPage);
         if (useDictionary) {
           // Read and decode dictionary ids.
-          ColumnVector dictionaryIds = column.reserveDictionaryIds(total);;
+          ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
           defColumn.readIntegers(
               num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
           decodeDictionaryIds(rowId, num, column, dictionaryIds);
@@ -836,7 +608,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       });
     }
 
-    private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset)throws IOException {
+    private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
       this.endOfPageValueCount = valuesRead + pageValueCount;
       if (dataEncoding.usesDictionary()) {
         this.dataColumn = null;
@@ -845,27 +617,18 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
               "could not read page in col " + descriptor +
                   " as the dictionary was missing for encoding " + dataEncoding);
         }
-        if (vectorizedDecode()) {
-          @SuppressWarnings("deprecation")
-          Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
-          if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
-            throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
-          }
-          this.dataColumn = new VectorizedRleValuesReader();
-        } else {
-          this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
-              descriptor, VALUES, dictionary);
+        @SuppressWarnings("deprecation")
+        Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+        if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
+          throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
         }
+        this.dataColumn = new VectorizedRleValuesReader();
         this.useDictionary = true;
       } else {
-        if (vectorizedDecode()) {
-          if (dataEncoding != Encoding.PLAIN) {
-            throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
-          }
-          this.dataColumn = new VectorizedPlainValuesReader();
-        } else {
-          this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+        if (dataEncoding != Encoding.PLAIN) {
+          throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
         }
+        this.dataColumn = new VectorizedPlainValuesReader();
         this.useDictionary = false;
       }
 
@@ -882,16 +645,12 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       ValuesReader dlReader;
 
       // Initialize the decoders.
-      if (vectorizedDecode()) {
-        if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
-          throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
-        }
-        int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
-        this.defColumn = new VectorizedRleValuesReader(bitWidth);
-        dlReader = this.defColumn;
-      } else {
-        dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+      if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
+        throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
       }
+      int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+      this.defColumn = new VectorizedRleValuesReader(bitWidth);
+      dlReader = this.defColumn;
       this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
       this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
       try {
@@ -911,16 +670,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
           page.getRepetitionLevels(), descriptor);
 
-      if (vectorizedDecode()) {
-        int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
-        this.defColumn = new VectorizedRleValuesReader(bitWidth);
-        this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
-        this.defColumn.initFromBuffer(
-            this.pageValueCount, page.getDefinitionLevels().toByteArray());
-      } else {
-        this.definitionLevelColumn = createRLEIterator(descriptor.getMaxDefinitionLevel(),
-            page.getDefinitionLevels(), descriptor);
-      }
+      int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+      this.defColumn = new VectorizedRleValuesReader(bitWidth);
+      this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
+      this.defColumn.initFromBuffer(
+          this.pageValueCount, page.getDefinitionLevels().toByteArray());
       try {
         initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
       } catch (IOException e) {
@@ -937,9 +691,10 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
           + rowsReturned + " out of " + totalRowCount);
     }
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    columnReaders = new ColumnReader[columns.size()];
+    columnReaders = new VectorizedColumnReader[columns.size()];
     for (int i = 0; i < columns.size(); ++i) {
-      columnReaders[i] = new ColumnReader(columns.get(i), pages.getPageReader(columns.get(i)));
+      columnReaders[i] = new VectorizedColumnReader(columns.get(i),
+          pages.getPageReader(columns.get(i)));
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 2abfd14916..cd769d0137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -160,6 +160,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       }
       (keyValueOutput, runFunc)
 
+    case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
+            s"deprecated and will be ignored. Vectorized parquet reader will be used instead.")
+        Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true"))
+      }
+      (keyValueOutput, runFunc)
+
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index e848f423eb..f3514cd14c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -33,9 +33,9 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
+import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
 
@@ -99,8 +99,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
 
   // If true, enable using the custom RecordReader for parquet. This only works for
   // a subset of the types (no complex types).
-  protected val enableUnsafeRowParquetReader: Boolean =
-    sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean
   protected val enableVectorizedParquetReader: Boolean =
     sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
   protected val enableWholestageCodegen: Boolean =
@@ -174,19 +172,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         * fails (for example, unsupported schema), try with the normal reader.
         * TODO: plumb this through a different way?
         */
-      if (enableUnsafeRowParquetReader &&
+      if (enableVectorizedParquetReader &&
         format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
-        val parquetReader: UnsafeRowParquetRecordReader = new UnsafeRowParquetRecordReader()
+        val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
         if (!parquetReader.tryInitialize(
             split.serializableHadoopSplit.value, hadoopAttemptContext)) {
           parquetReader.close()
         } else {
           reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
-          if (enableVectorizedParquetReader) {
-            parquetReader.resultBatch()
-            // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-            if (enableWholestageCodegen) parquetReader.enableReturningBatches();
-          }
+          parquetReader.resultBatch()
+          // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+          if (enableWholestageCodegen) parquetReader.enableReturningBatches()
         }
       }
 
@@ -203,7 +199,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       private[this] var finished = false
 
       override def hasNext: Boolean = {
-        if (context.isInterrupted) {
+        if (context.isInterrupted()) {
           throw new TaskKilledException
         }
         if (!finished && !havePair) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c308161413..473cde56fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -345,11 +345,6 @@ object SQLConf {
       "option must be set in Hadoop Configuration.  2. This option overrides " +
       "\"spark.sql.sources.outputCommitterClass\".")
 
-  val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
-    key = "spark.sql.parquet.enableUnsafeRowRecordReader",
-    defaultValue = Some(true),
-    doc = "Enables using the custom ParquetUnsafeRowRecordReader.")
-
   val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
     key = "spark.sql.parquet.enableVectorizedReader",
     defaultValue = Some(true),
@@ -527,6 +522,7 @@ object SQLConf {
     val CODEGEN_ENABLED = "spark.sql.codegen"
     val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
     val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
+    val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = "spark.sql.parquet.enableUnsafeRowRecordReader"
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 29318d8b56..f42c7546c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -36,7 +36,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         data.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index b394ffb366..51183e970d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -57,7 +57,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     val output = predicate.collect { case a: Attribute => a }.distinct
 
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val query = df
           .select(output.map(e => Column(e)): _*)
           .where(Column(predicate))
@@ -446,7 +446,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
   test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
     import testImplicits._
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         withTempPath { dir =>
           val path = s"${dir.getCanonicalPath}/part=1"
           (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
@@ -520,7 +520,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
   test("SPARK-11164: test the parquet filter in") {
     import testImplicits._
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         withTempPath { dir =>
           val path = s"${dir.getCanonicalPath}/table1"
           (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4d9a8d7eb1..ebdb105743 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -656,7 +656,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       var hash1: Int = 0
       var hash2: Int = 0
       (false :: true :: Nil).foreach { v =>
-        withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> v.toString) {
+        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) {
           val df = sqlContext.read.parquet(dir.getCanonicalPath)
           val rows = df.queryExecution.toRdd.map(_.copy()).collect()
           val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
@@ -672,13 +672,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("UnsafeRowParquetRecordReader - direct path read") {
-    val data = (0 to 10).map(i => (i, ((i + 'a').toChar.toString)))
+  test("VectorizedParquetRecordReader - direct path read") {
+    val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
     withTempPath { dir =>
       sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
       val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, null)
           val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -695,7 +695,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Project just one column
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, ("_2" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String)]
@@ -711,7 +711,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Project columns in opposite order
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -728,7 +728,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
       // Empty projection
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, List[String]().asJava)
           var result = 0
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 15bf00e6f4..070c4004c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -82,38 +82,17 @@ object ParquetReadBenchmark {
         }
 
         sqlBenchmark.addCase("SQL Parquet MR") { iter =>
-          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
-            sqlContext.sql("select sum(id) from tempTable").collect()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet Non-Vectorized") { iter =>
           withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
             sqlContext.sql("select sum(id) from tempTable").collect()
           }
         }
 
         val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
-        // Driving the parquet reader directly without Spark.
-        parquetReaderBenchmark.addCase("ParquetReader Non-Vectorized") { num =>
-          var sum = 0L
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
-            reader.initialize(p, ("id" :: Nil).asJava)
-
-            while (reader.nextKeyValue()) {
-              val record = reader.getCurrentValue.asInstanceOf[InternalRow]
-              if (!record.isNullAt(0)) sum += record.getInt(0)
-            }
-            reader.close()
-          }
-        }
-
         // Driving the parquet reader in batch mode directly.
         parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -136,7 +115,7 @@ object ParquetReadBenchmark {
         parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -159,7 +138,6 @@ object ParquetReadBenchmark {
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    215 /  262         73.0          13.7       1.0X
         SQL Parquet MR                           1946 / 2083          8.1         123.7       0.1X
-        SQL Parquet Non-Vectorized               1079 / 1213         14.6          68.6       0.2X
         */
         sqlBenchmark.run()
 
@@ -167,9 +145,8 @@ object ParquetReadBenchmark {
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
         Parquet Reader Single Int Column    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
-        ParquetReader Non-Vectorized              610 /  737         25.8          38.8       1.0X
-        ParquetReader Vectorized                  123 /  152        127.8           7.8       5.0X
-        ParquetReader Vectorized -> Row           165 /  180         95.2          10.5       3.7X
+        ParquetReader Vectorized                  123 /  152        127.8           7.8       1.0X
+        ParquetReader Vectorized -> Row           165 /  180         95.2          10.5       0.7X
         */
         parquetReaderBenchmark.run()
       }
@@ -191,32 +168,12 @@ object ParquetReadBenchmark {
         }
 
         benchmark.addCase("SQL Parquet MR") { iter =>
-          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
-            sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
-          }
-        }
-
-        benchmark.addCase("SQL Parquet Non-vectorized") { iter =>
           withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
             sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
           }
         }
 
         val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
-        benchmark.addCase("ParquetReader Non-vectorized") { num =>
-          var sum1 = 0L
-          var sum2 = 0L
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
-            reader.initialize(p, null)
-            while (reader.nextKeyValue()) {
-              val record = reader.getCurrentValue.asInstanceOf[InternalRow]
-              if (!record.isNullAt(0)) sum1 += record.getInt(0)
-              if (!record.isNullAt(1)) sum2 += record.getUTF8String(1).numBytes()
-            }
-            reader.close()
-          }
-        }
 
         /*
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
@@ -224,8 +181,6 @@ object ParquetReadBenchmark {
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    628 /  720         16.7          59.9       1.0X
         SQL Parquet MR                           1905 / 2239          5.5         181.7       0.3X
-        SQL Parquet Non-vectorized               1429 / 1732          7.3         136.3       0.4X
-        ParquetReader Non-vectorized              989 / 1357         10.6          94.3       0.6X
         */
         benchmark.run()
       }
@@ -247,7 +202,7 @@ object ParquetReadBenchmark {
         }
 
         benchmark.addCase("SQL Parquet MR") { iter =>
-          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
             sqlContext.sql("select sum(length(c1)) from tempTable").collect
           }
         }
@@ -293,7 +248,7 @@ object ParquetReadBenchmark {
         Read data column                          191 /  250         82.1          12.2       1.0X
         Read partition column                      82 /   86        192.4           5.2       2.3X
         Read both columns                         220 /  248         71.5          14.0       0.9X
-         */
+        */
         benchmark.run()
       }
     }
@@ -319,7 +274,7 @@ object ParquetReadBenchmark {
         benchmark.addCase("PR Vectorized") { num =>
           var sum = 0
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -340,7 +295,7 @@ object ParquetReadBenchmark {
         benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()
-- 
GitLab