From b9dcbe5e1ba81135a51b486240662674728dda84 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Thu, 16 Nov 2017 18:23:00 -0800
Subject: [PATCH] [SPARK-22542][SQL] remove unused features in ColumnarBatch

## What changes were proposed in this pull request?

`ColumnarBatch` provides features to do fast filter and project in a columnar fashion, however this feature is never used by Spark, as Spark uses whole stage codegen and processes the data in a row fashion. This PR proposes to remove these unused features as we won't switch to columnar execution in the near future. Even we do, I think this part needs a proper redesign.

This is also a step to make `ColumnVector` public, as we don't wanna expose these features to users.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19766 from cloud-fan/vector.
---
 .../sql/catalyst/expressions/UnsafeRow.java   |  4 -
 .../execution/vectorized/ColumnarArray.java   |  2 +-
 .../execution/vectorized/ColumnarBatch.java   | 78 +------------------
 .../sql/execution/vectorized/ColumnarRow.java | 26 -------
 .../arrow/ArrowConvertersSuite.scala          |  2 +-
 .../parquet/ParquetReadBenchmark.scala        | 24 ------
 .../vectorized/ColumnarBatchSuite.scala       | 52 -------------
 7 files changed, 6 insertions(+), 182 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index ec947d7580..71c086029c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -69,10 +69,6 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
     return ((numFields + 63)/ 64) * 8;
   }
 
-  public static int calculateFixedPortionByteSize(int numFields) {
-    return 8 * numFields + calculateBitSetWidthInBytes(numFields);
-  }
-
   /**
    * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
    */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
index 5e88ce0321..34bde3e14d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java
@@ -41,7 +41,7 @@ public final class ColumnarArray extends ArrayData {
   // Reused staging buffer, used for loading from offheap.
   protected byte[] tmpByteArray = new byte[1];
 
-  protected ColumnarArray(ColumnVector data) {
+  ColumnarArray(ColumnVector data) {
     this.data = data;
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 8849a20d6c..2f5fb360b2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -42,15 +42,6 @@ public final class ColumnarBatch {
   private int numRows;
   final ColumnVector[] columns;
 
-  // True if the row is filtered.
-  private final boolean[] filteredRows;
-
-  // Column indices that cannot have null values.
-  private final Set<Integer> nullFilteredColumns;
-
-  // Total number of rows that have been filtered.
-  private int numRowsFiltered = 0;
-
   // Staging row returned from getRow.
   final ColumnarRow row;
 
@@ -68,24 +59,18 @@ public final class ColumnarBatch {
    * Returns an iterator over the rows in this batch. This skips rows that are filtered out.
    */
   public Iterator<ColumnarRow> rowIterator() {
-    final int maxRows = ColumnarBatch.this.numRows();
-    final ColumnarRow row = new ColumnarRow(this);
+    final int maxRows = numRows;
+    final ColumnarRow row = new ColumnarRow(columns);
     return new Iterator<ColumnarRow>() {
       int rowId = 0;
 
       @Override
       public boolean hasNext() {
-        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
-          ++rowId;
-        }
         return rowId < maxRows;
       }
 
       @Override
       public ColumnarRow next() {
-        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
-          ++rowId;
-        }
         if (rowId >= maxRows) {
           throw new NoSuchElementException();
         }
@@ -109,31 +94,15 @@ public final class ColumnarBatch {
         ((WritableColumnVector) columns[i]).reset();
       }
     }
-    if (this.numRowsFiltered > 0) {
-      Arrays.fill(filteredRows, false);
-    }
     this.numRows = 0;
-    this.numRowsFiltered = 0;
   }
 
   /**
-   * Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or
-   * more of their attributes are part of a non-nullable column.
+   * Sets the number of rows that are valid.
    */
   public void setNumRows(int numRows) {
     assert(numRows <= this.capacity);
     this.numRows = numRows;
-
-    for (int ordinal : nullFilteredColumns) {
-      if (columns[ordinal].numNulls() != 0) {
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) {
-            filteredRows[rowId] = true;
-            ++numRowsFiltered;
-          }
-        }
-      }
-    }
   }
 
   /**
@@ -146,14 +115,6 @@ public final class ColumnarBatch {
    */
   public int numRows() { return numRows; }
 
-  /**
-   * Returns the number of valid rows.
-   */
-  public int numValidRows() {
-    assert(numRowsFiltered <= numRows);
-    return numRows - numRowsFiltered;
-  }
-
   /**
    * Returns the schema that makes up this batch.
    */
@@ -169,17 +130,6 @@ public final class ColumnarBatch {
    */
   public ColumnVector column(int ordinal) { return columns[ordinal]; }
 
-  /**
-   * Sets (replaces) the column at `ordinal` with column. This can be used to do very efficient
-   * projections.
-   */
-  public void setColumn(int ordinal, ColumnVector column) {
-    if (column instanceof OffHeapColumnVector) {
-      throw new UnsupportedOperationException("Need to ref count columns.");
-    }
-    columns[ordinal] = column;
-  }
-
   /**
    * Returns the row in this batch at `rowId`. Returned row is reused across calls.
    */
@@ -190,30 +140,10 @@ public final class ColumnarBatch {
     return row;
   }
 
-  /**
-   * Marks this row as being filtered out. This means a subsequent iteration over the rows
-   * in this batch will not include this row.
-   */
-  public void markFiltered(int rowId) {
-    assert(!filteredRows[rowId]);
-    filteredRows[rowId] = true;
-    ++numRowsFiltered;
-  }
-
-  /**
-   * Marks a given column as non-nullable. Any row that has a NULL value for the corresponding
-   * attribute is filtered out.
-   */
-  public void filterNullsInColumn(int ordinal) {
-    nullFilteredColumns.add(ordinal);
-  }
-
   public ColumnarBatch(StructType schema, ColumnVector[] columns, int capacity) {
     this.schema = schema;
     this.columns = columns;
     this.capacity = capacity;
-    this.nullFilteredColumns = new HashSet<>();
-    this.filteredRows = new boolean[capacity];
-    this.row = new ColumnarRow(this);
+    this.row = new ColumnarRow(columns);
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java
index c75adafd69..98a9073227 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.CalendarInterval;
@@ -32,28 +31,11 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public final class ColumnarRow extends InternalRow {
   protected int rowId;
-  private final ColumnarBatch parent;
-  private final int fixedLenRowSize;
   private final ColumnVector[] columns;
   private final WritableColumnVector[] writableColumns;
 
-  // Ctor used if this is a top level row.
-  ColumnarRow(ColumnarBatch parent) {
-    this.parent = parent;
-    this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(parent.numCols());
-    this.columns = parent.columns;
-    this.writableColumns = new WritableColumnVector[this.columns.length];
-    for (int i = 0; i < this.columns.length; i++) {
-      if (this.columns[i] instanceof WritableColumnVector) {
-        this.writableColumns[i] = (WritableColumnVector) this.columns[i];
-      }
-    }
-  }
-
   // Ctor used if this is a struct.
   ColumnarRow(ColumnVector[] columns) {
-    this.parent = null;
-    this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(columns.length);
     this.columns = columns;
     this.writableColumns = new WritableColumnVector[this.columns.length];
     for (int i = 0; i < this.columns.length; i++) {
@@ -63,14 +45,6 @@ public final class ColumnarRow extends InternalRow {
     }
   }
 
-  /**
-   * Marks this row as being filtered out. This means a subsequent iteration over the rows
-   * in this batch will not include this row.
-   */
-  public void markFiltered() {
-    parent.markFiltered(rowId);
-  }
-
   public ColumnVector[] columns() { return columns; }
 
   @Override
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index ba2903babb..57958f7239 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1731,7 +1731,7 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
     val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx)
     val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx)
 
-    assert(schema.equals(outputRowIter.schema))
+    assert(schema == outputRowIter.schema)
 
     var count = 0
     outputRowIter.zipWithIndex.foreach { case (row, i) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 0917f188b9..de7a5795b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -295,48 +295,24 @@ object ParquetReadBenchmark {
           }
         }
 
-        benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
-          var sum = 0L
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new VectorizedParquetRecordReader
-            try {
-              reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
-              val batch = reader.resultBatch()
-              batch.filterNullsInColumn(0)
-              batch.filterNullsInColumn(1)
-              while (reader.nextBatch()) {
-                val rowIterator = batch.rowIterator()
-                while (rowIterator.hasNext) {
-                  sum += rowIterator.next().getUTF8String(0).numBytes()
-                }
-              }
-            } finally {
-              reader.close()
-            }
-          }
-        }
-
         /*
         Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
         String with Nulls Scan (0%):        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                   1229 / 1648          8.5         117.2       1.0X
         PR Vectorized                             833 /  846         12.6          79.4       1.5X
-        PR Vectorized (Null Filtering)            732 /  782         14.3          69.8       1.7X
 
         Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
         String with Nulls Scan (50%):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    995 / 1053         10.5          94.9       1.0X
         PR Vectorized                             732 /  772         14.3          69.8       1.4X
-        PR Vectorized (Null Filtering)            725 /  790         14.5          69.1       1.4X
 
         Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
         String with Nulls Scan (95%):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
         -------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    326 /  333         32.2          31.1       1.0X
         PR Vectorized                             190 /  200         55.1          18.2       1.7X
-        PR Vectorized (Null Filtering)            168 /  172         62.2          16.1       1.9X
         */
 
         benchmark.run()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 4cfc776e51..4a6c8f5521 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -919,7 +919,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val batch = new ColumnarBatch(schema, columns.toArray, ColumnarBatch.DEFAULT_BATCH_SIZE)
       assert(batch.numCols() == 4)
       assert(batch.numRows() == 0)
-      assert(batch.numValidRows() == 0)
       assert(batch.capacity() > 0)
       assert(batch.rowIterator().hasNext == false)
 
@@ -933,7 +932,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       // Verify the results of the row.
       assert(batch.numCols() == 4)
       assert(batch.numRows() == 1)
-      assert(batch.numValidRows() == 1)
       assert(batch.rowIterator().hasNext == true)
       assert(batch.rowIterator().hasNext == true)
 
@@ -957,16 +955,9 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(it.hasNext == false)
       assert(it.hasNext == false)
 
-      // Filter out the row.
-      row.markFiltered()
-      assert(batch.numRows() == 1)
-      assert(batch.numValidRows() == 0)
-      assert(batch.rowIterator().hasNext == false)
-
       // Reset and add 3 rows
       batch.reset()
       assert(batch.numRows() == 0)
-      assert(batch.numValidRows() == 0)
       assert(batch.rowIterator().hasNext == false)
 
       // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world]
@@ -1002,26 +993,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       // Verify
       assert(batch.numRows() == 3)
-      assert(batch.numValidRows() == 3)
       val it2 = batch.rowIterator()
       rowEquals(it2.next(), Row(null, 2.2, 2, "abc"))
       rowEquals(it2.next(), Row(3, null, 3, ""))
       rowEquals(it2.next(), Row(4, 4.4, 4, "world"))
       assert(!it.hasNext)
 
-      // Filter out some rows and verify
-      batch.markFiltered(1)
-      assert(batch.numValidRows() == 2)
-      val it3 = batch.rowIterator()
-      rowEquals(it3.next(), Row(null, 2.2, 2, "abc"))
-      rowEquals(it3.next(), Row(4, 4.4, 4, "world"))
-      assert(!it.hasNext)
-
-      batch.markFiltered(2)
-      assert(batch.numValidRows() == 1)
-      val it4 = batch.rowIterator()
-      rowEquals(it4.next(), Row(null, 2.2, 2, "abc"))
-
       batch.close()
     }}
   }
@@ -1176,35 +1153,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
     testRandomRows(false, 30)
   }
 
-  test("null filtered columns") {
-    val NUM_ROWS = 10
-    val schema = new StructType()
-      .add("key", IntegerType, nullable = false)
-      .add("value", StringType, nullable = true)
-    for (numNulls <- List(0, NUM_ROWS / 2, NUM_ROWS)) {
-      val rows = mutable.ArrayBuffer.empty[Row]
-      for (i <- 0 until NUM_ROWS) {
-        val row = if (i < numNulls) Row.fromSeq(Seq(i, null)) else Row.fromSeq(Seq(i, i.toString))
-        rows += row
-      }
-      (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-        val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
-        batch.filterNullsInColumn(1)
-        batch.setNumRows(NUM_ROWS)
-        assert(batch.numRows() == NUM_ROWS)
-        val it = batch.rowIterator()
-        // Top numNulls rows should be filtered
-        var k = numNulls
-        while (it.hasNext) {
-          assert(it.next().getInt(0) == k)
-          k += 1
-        }
-        assert(k == NUM_ROWS)
-        batch.close()
-      }}
-    }
-  }
-
   test("mutable ColumnarBatch rows") {
     val NUM_ITERS = 10
     val types = Array(
-- 
GitLab