From ebbe589d12434bc108672268bee05a7b7e571ee6 Mon Sep 17 00:00:00 2001
From: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Date: Thu, 27 Jul 2017 15:27:24 +0800
Subject: [PATCH] [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of
 8

## What changes were proposed in this pull request?

This PR ensures that `Unsafe.sizeInBytes` must be a multiple of 8. It it is not satisfied. `Unsafe.hashCode` causes the assertion violation.

## How was this patch tested?

Will add test cases

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18503 from kiszk/SPARK-21271.
---
 .../FixedLengthRowBasedKeyValueBatch.java            |  6 +++---
 .../spark/sql/catalyst/expressions/UnsafeRow.java    |  2 ++
 .../VariableLengthRowBasedKeyValueBatch.java         |  6 +++---
 .../spark/sql/execution/UnsafeExternalRowSorter.java |  7 ++++---
 .../spark/sql/execution/UnsafeKVExternalSorter.java  |  6 +++---
 .../state/HDFSBackedStateStoreProvider.scala         | 12 ++++++++++--
 6 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
index a88a315bf4..df52f9c2d5 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java
@@ -62,7 +62,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
 
     keyRowId = numRows;
     keyRow.pointTo(base, recordOffset, klen);
-    valueRow.pointTo(base, recordOffset + klen, vlen + 4);
+    valueRow.pointTo(base, recordOffset + klen, vlen);
     numRows++;
     return valueRow;
   }
@@ -95,7 +95,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
       getKeyRow(rowId);
     }
     assert(rowId >= 0);
-    valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4);
+    valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
     return valueRow;
   }
 
@@ -131,7 +131,7 @@ public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatc
         }
 
         key.pointTo(base, offsetInPage, klen);
-        value.pointTo(base, offsetInPage + klen, vlen + 4);
+        value.pointTo(base, offsetInPage + klen, vlen);
 
         offsetInPage += recordLength;
         recordsInPage -= 1;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 56994fafe0..ec947d7580 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -167,6 +167,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
    */
   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
     assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
+    assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
     this.baseObject = baseObject;
     this.baseOffset = baseOffset;
     this.sizeInBytes = sizeInBytes;
@@ -183,6 +184,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
   }
 
   public void setTotalSize(int sizeInBytes) {
+    assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
     this.sizeInBytes = sizeInBytes;
   }
 
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
index ea4f984be2..905e6820ce 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
@@ -65,7 +65,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
 
     keyRowId = numRows;
     keyRow.pointTo(base, recordOffset + 8, klen);
-    valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4);
+    valueRow.pointTo(base, recordOffset + 8 + klen, vlen);
     numRows++;
     return valueRow;
   }
@@ -102,7 +102,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
     long offset = keyRow.getBaseOffset();
     int klen = keyRow.getSizeInBytes();
     int vlen = Platform.getInt(base, offset - 8) - klen - 4;
-    valueRow.pointTo(base, offset + klen, vlen + 4);
+    valueRow.pointTo(base, offset + klen, vlen);
     return valueRow;
   }
 
@@ -146,7 +146,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
         currentvlen = totalLength - currentklen;
 
         key.pointTo(base, offsetInPage + 8, currentklen);
-        value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4);
+        value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen);
 
         offsetInPage += 8 + totalLength + 8;
         recordsInPage -= 1;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index aadfcaa56c..53b0886541 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -208,9 +208,10 @@ public final class UnsafeExternalRowSorter {
 
     @Override
     public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
-      // TODO: Why are the sizes -1?
-      row1.pointTo(baseObj1, baseOff1, -1);
-      row2.pointTo(baseObj2, baseOff2, -1);
+      // Note that since ordering doesn't need the total length of the record, we just pass 0
+      // into the row.
+      row1.pointTo(baseObj1, baseOff1, 0);
+      row2.pointTo(baseObj2, baseOff2, 0);
       return ordering.compare(row1, row2);
     }
   }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index ee5bcfd02c..d8acf11a97 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -238,10 +238,10 @@ public final class UnsafeKVExternalSorter {
 
     @Override
     public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
-      // Note that since ordering doesn't need the total length of the record, we just pass -1
+      // Note that since ordering doesn't need the total length of the record, we just pass 0
       // into the row.
-      row1.pointTo(baseObj1, baseOff1 + 4, -1);
-      row2.pointTo(baseObj2, baseOff2 + 4, -1);
+      row1.pointTo(baseObj1, baseOff1 + 4, 0);
+      row2.pointTo(baseObj2, baseOff2 + 4, 0);
       return ordering.compare(row1, row2);
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index fa4c99c019..e0c2e94207 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -369,7 +369,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
             val valueRowBuffer = new Array[Byte](valueSize)
             ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
             val valueRow = new UnsafeRow(valueSchema.fields.length)
-            valueRow.pointTo(valueRowBuffer, valueSize)
+            // If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
+            // This is a workaround for the following:
+            // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
+            // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
+            valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
             map.put(keyRow, valueRow)
           }
         }
@@ -433,7 +437,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
             val valueRowBuffer = new Array[Byte](valueSize)
             ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
             val valueRow = new UnsafeRow(valueSchema.fields.length)
-            valueRow.pointTo(valueRowBuffer, valueSize)
+            // If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
+            // This is a workaround for the following:
+            // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
+            // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
+            valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
             map.put(keyRow, valueRow)
           }
         }
-- 
GitLab