diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index ec947d7580282006467d4b601b41a88cc48ea404..71c086029cc5b236a17c243f763e3d2ae07e3f0f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -69,10 +69,6 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
     return ((numFields + 63)/ 64) * 8;
   }
 
-  public static int calculateFixedPortionByteSize(int numFields) {
-    return 8 * numFields + calculateBitSetWidthInBytes(numFields);
-  }
-
   /**
    * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
    */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
index 5e88ce03210846c83da0945eb35132ea6a6665fa..34bde3e14d378ac68d8a0636a760ae4e25a1dd0e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
@@ -41,7 +41,7 @@ public final class ColumnarArray extends ArrayData {
   // Reused staging buffer, used for loading from offheap.
   protected byte[] tmpByteArray = new byte[1];
 
-  protected ColumnarArray(ColumnVector data) {
+  ColumnarArray(ColumnVector data) {
     this.data = data;
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 8849a20d6ceb5201b20ca99040776ad5905cb42a..2f5fb360b226f61e5cf0015b327900c83b0f29d9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -42,15 +42,6 @@ public final class ColumnarBatch {
   private int numRows;
   final ColumnVector[] columns;
 
-  // True if the row is filtered.
-  private final boolean[] filteredRows;
-
-  // Column indices that cannot have null values.
-  private final Set<Integer> nullFilteredColumns;
-
-  // Total number of rows that have been filtered.
-  private int numRowsFiltered = 0;
-
   // Staging row returned from getRow.
   final ColumnarRow row;
 
@@ -68,24 +59,18 @@ public final class ColumnarBatch {
    * Returns an iterator over the rows in this batch. This skips rows that are filtered out.
    */
   public Iterator<ColumnarRow> rowIterator() {
-    final int maxRows = ColumnarBatch.this.numRows();
-    final ColumnarRow row = new ColumnarRow(this);
+    final int maxRows = numRows;
+    final ColumnarRow row = new ColumnarRow(columns);
     return new Iterator<ColumnarRow>() {
       int rowId = 0;
 
       @Override
       public boolean hasNext() {
-        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
-          ++rowId;
-        }
         return rowId < maxRows;
       }
 
       @Override
       public ColumnarRow next() {
-        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
-          ++rowId;
-        }
         if (rowId >= maxRows) {
           throw new NoSuchElementException();
         }
@@ -109,31 +94,15 @@ public final class ColumnarBatch {
         ((WritableColumnVector) columns[i]).reset();
       }
     }
-    if (this.numRowsFiltered > 0) {
-      Arrays.fill(filteredRows, false);
-    }
     this.numRows = 0;
-    this.numRowsFiltered = 0;
   }
 
   /**
-   * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or
-   * more of their attributes are part of a non-nullable column.
+   * Sets the number of rows that are valid.
    */
   public void setNumRows(int numRows) {
     assert(numRows <= this.capacity);
     this.numRows = numRows;
-
-    for (int ordinal : nullFilteredColumns) {
-      if (columns[ordinal].numNulls() != 0) {
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) {
-            filteredRows[rowId] = true;
-            ++numRowsFiltered;
-          }
-        }
-      }
-    }
   }
 
   /**
@@ -146,14 +115,6 @@ public final class ColumnarBatch {
    */
   public int numRows() { return numRows; }
 
-  /**
-   * Returns the number of valid rows.
-   */
-  public int numValidRows() {
-    assert(numRowsFiltered <= numRows);
-    return numRows - numRowsFiltered;
-  }
-
   /**
    * Returns the schema that makes up this batch.
    */
@@ -169,17 +130,6 @@ public final class ColumnarBatch {
    */
   public ColumnVector column(int ordinal) { return columns[ordinal]; }
 
-  /**
-   * Sets (replaces) the column at `ordinal` with column. This can be used to do very efficient
-   * projections.
-   */
-  public void setColumn(int ordinal, ColumnVector column) {
-    if (column instanceof OffHeapColumnVector) {
-      throw new UnsupportedOperationException("Need to ref count columns.");
-    }
-    columns[ordinal] = column;
-  }
-
   /**
    * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    */
@@ -190,30 +140,10 @@ public final class ColumnarBatch {
     return row;
   }
 
-  /**
-   * Marks this row as being filtered out. This means a subsequent iteration over the rows
-   * in this batch will not include this row.
-   */
-  public void markFiltered(int rowId) {
-    assert(!filteredRows[rowId]);
-    filteredRows[rowId] = true;
-    ++numRowsFiltered;
-  }
-
-  /**
-   * Marks a given column as non-nullable. Any row that has a NULL value for the corresponding
-   * attribute is filtered out.
-   */
-  public void filterNullsInColumn(int ordinal) {
-    nullFilteredColumns.add(ordinal);
-  }
-
   public ColumnarBatch(StructType schema, ColumnVector[] columns, int capacity) {
     this.schema = schema;
     this.columns = columns;
     this.capacity = capacity;
-    this.nullFilteredColumns = new HashSet<>();
-    this.filteredRows = new boolean[capacity];
-    this.row = new ColumnarRow(this);
+    this.row = new ColumnarRow(columns);
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java
index c75adafd6946192d7b845e7b42f239a04b21f99c..98a907322713baacb8f6aca7bc22f817be6f7db8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.CalendarInterval;
@@ -32,28 +31,11 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public final class ColumnarRow extends InternalRow {
   protected int rowId;
-  private final ColumnarBatch parent;
-  private final int fixedLenRowSize;
   private final ColumnVector[] columns;
   private final WritableColumnVector[] writableColumns;
 
-  // Ctor used if this is a top level row.
-  ColumnarRow(ColumnarBatch parent) {
-    this.parent = parent;
-    this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(parent.numCols());
-    this.columns = parent.columns;
-    this.writableColumns = new WritableColumnVector[this.columns.length];
-    for (int i = 0; i < this.columns.length; i++) {
-      if (this.columns[i] instanceof WritableColumnVector) {
-        this.writableColumns[i] = (WritableColumnVector) this.columns[i];
-      }
-    }
-  }
-
   // Ctor used if this is a struct.
   ColumnarRow(ColumnVector[] columns) {
-    this.parent = null;
-    this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(columns.length);
     this.columns = columns;
     this.writableColumns = new WritableColumnVector[this.columns.length];
     for (int i = 0; i < this.columns.length; i++) {
@@ -63,14 +45,6 @@ public final class ColumnarRow extends InternalRow {
     }
   }
 
-  /**
-   * Marks this row as being filtered out. This means a subsequent iteration over the rows
-   * in this batch will not include this row.
-   */
-  public void markFiltered() {
-    parent.markFiltered(rowId);
-  }
-
   public ColumnVector[] columns() { return columns; }
 
   @Override
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index ba2903babbba81f6716a3fbfaa377109fbff1920..57958f72392241e64df8ca7bf646a5f730b569bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1731,7 +1731,7 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
     val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx)
     val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx)
 
-    assert(schema.equals(outputRowIter.schema))
+    assert(schema == outputRowIter.schema)
 
     var count = 0
     outputRowIter.zipWithIndex.foreach { case (row, i) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 0917f188b97998382e081a69d7484be0bfa80787..de7a5795b4796f44bb133a0237bcc7dc4b5b8f2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -295,48 +295,24 @@ object ParquetReadBenchmark {
           }
         }
 
-        benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
-          var sum = 0L
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader
-            try {
-              reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
-              val batch = reader.resultBatch()
-              batch.filterNullsInColumn(0)
-              batch.filterNullsInColumn(1)
-              while (reader.nextBatch()) {
-                val rowIterator = batch.rowIterator()
-                while (rowIterator.hasNext) {
-                  sum += rowIterator.next().getUTF8String(0).numBytes()
-                }
-              }
-            } finally {
-              reader.close()
-            }
-          }
-        }
-
         /*
         Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
         String with Nulls Scan (0%):        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                   1229 / 1648          8.5         117.2       1.0X
         PR Vectorized                             833 /  846         12.6          79.4       1.5X
-        PR Vectorized (Null Filtering)            732 /  782         14.3          69.8       1.7X
 
         Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
         String with Nulls Scan (50%):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    995 / 1053         10.5          94.9       1.0X
         PR Vectorized                             732 /  772         14.3          69.8       1.4X
-        PR Vectorized (Null Filtering)            725 /  790         14.5          69.1       1.4X
 
         Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
         String with Nulls Scan (95%):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    326 /  333         32.2          31.1       1.0X
         PR Vectorized                             190 /  200         55.1          18.2       1.7X
-        PR Vectorized (Null Filtering)            168 /  172         62.2          16.1       1.9X
         */
 
         benchmark.run()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 4cfc776e51db116657057ccf95cec91ece203412..4a6c8f5521d1882f2295fe373f36095cf635f114 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -919,7 +919,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val batch = new ColumnarBatch(schema, columns.toArray, ColumnarBatch.DEFAULT_BATCH_SIZE)
       assert(batch.numCols() == 4)
       assert(batch.numRows() == 0)
-      assert(batch.numValidRows() == 0)
       assert(batch.capacity() > 0)
       assert(batch.rowIterator().hasNext == false)
 
@@ -933,7 +932,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       // Verify the results of the row.
       assert(batch.numCols() == 4)
       assert(batch.numRows() == 1)
-      assert(batch.numValidRows() == 1)
       assert(batch.rowIterator().hasNext == true)
       assert(batch.rowIterator().hasNext == true)
 
@@ -957,16 +955,9 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(it.hasNext == false)
       assert(it.hasNext == false)
 
-      // Filter out the row.
-      row.markFiltered()
-      assert(batch.numRows() == 1)
-      assert(batch.numValidRows() == 0)
-      assert(batch.rowIterator().hasNext == false)
-
       // Reset and add 3 rows
       batch.reset()
       assert(batch.numRows() == 0)
-      assert(batch.numValidRows() == 0)
       assert(batch.rowIterator().hasNext == false)
 
       // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world]
@@ -1002,26 +993,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       // Verify
       assert(batch.numRows() == 3)
-      assert(batch.numValidRows() == 3)
       val it2 = batch.rowIterator()
       rowEquals(it2.next(), Row(null, 2.2, 2, "abc"))
       rowEquals(it2.next(), Row(3, null, 3, ""))
       rowEquals(it2.next(), Row(4, 4.4, 4, "world"))
       assert(!it.hasNext)
 
-      // Filter out some rows and verify
-      batch.markFiltered(1)
-      assert(batch.numValidRows() == 2)
-      val it3 = batch.rowIterator()
-      rowEquals(it3.next(), Row(null, 2.2, 2, "abc"))
-      rowEquals(it3.next(), Row(4, 4.4, 4, "world"))
-      assert(!it.hasNext)
-
-      batch.markFiltered(2)
-      assert(batch.numValidRows() == 1)
-      val it4 = batch.rowIterator()
-      rowEquals(it4.next(), Row(null, 2.2, 2, "abc"))
-
       batch.close()
     }}
   }
@@ -1176,35 +1153,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
     testRandomRows(false, 30)
   }
 
-  test("null filtered columns") {
-    val NUM_ROWS = 10
-    val schema = new StructType()
-      .add("key", IntegerType, nullable = false)
-      .add("value", StringType, nullable = true)
-    for (numNulls <- List(0, NUM_ROWS / 2, NUM_ROWS)) {
-      val rows = mutable.ArrayBuffer.empty[Row]
-      for (i <- 0 until NUM_ROWS) {
-        val row = if (i < numNulls) Row.fromSeq(Seq(i, null)) else Row.fromSeq(Seq(i, i.toString))
-        rows += row
-      }
-      (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-        val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
-        batch.filterNullsInColumn(1)
-        batch.setNumRows(NUM_ROWS)
-        assert(batch.numRows() == NUM_ROWS)
-        val it = batch.rowIterator()
-        // Top numNulls rows should be filtered
-        var k = numNulls
-        while (it.hasNext) {
-          assert(it.next().getInt(0) == k)
-          k += 1
-        }
-        assert(k == NUM_ROWS)
-        batch.close()
-      }}
-    }
-  }
-
   test("mutable ColumnarBatch rows") {
     val NUM_ITERS = 10
     val types = Array(