From 8a6f33f0483dcee81467e6374a796b5dbd53ea30 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Mon, 27 Mar 2017 19:04:16 -0700
Subject: [PATCH] [SPARK-19876][SS] Follow up: Refactored BatchCommitLog to
 simplify logic

## What changes were proposed in this pull request?

Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null.

## How was this patch tested?
Existing tests pass.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17444 from tdas/SPARK-19876-1.
---
 .../execution/streaming/BatchCommitLog.scala  | 28 +++++++++++--------
 .../execution/streaming/HDFSMetadataLog.scala |  1 +
 .../execution/streaming/StreamExecution.scala |  2 +-
 3 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
index fb1a4fb9b1..a34938f911 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
@@ -45,33 +45,39 @@ import org.apache.spark.sql.SparkSession
 class BatchCommitLog(sparkSession: SparkSession, path: String)
   extends HDFSMetadataLog[String](sparkSession, path) {
 
+  import BatchCommitLog._
+
+  def add(batchId: Long): Unit = {
+    super.add(batchId, EMPTY_JSON)
+  }
+
+  override def add(batchId: Long, metadata: String): Boolean = {
+    throw new UnsupportedOperationException(
+      "BatchCommitLog does not take any metadata, use 'add(batchId)' instead")
+  }
+
   override protected def deserialize(in: InputStream): String = {
     // called inside a try-finally where the underlying stream is closed in the caller
     val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
     if (!lines.hasNext) {
       throw new IllegalStateException("Incomplete log file in the offset commit log")
     }
-    parseVersion(lines.next().trim, BatchCommitLog.VERSION)
-    // read metadata
-    lines.next().trim match {
-      case BatchCommitLog.SERIALIZED_VOID => null
-      case metadata => metadata
-    }
+    parseVersion(lines.next.trim, VERSION)
+    EMPTY_JSON
   }
 
   override protected def serialize(metadata: String, out: OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in the caller
-    out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
+    out.write(s"v${VERSION}".getBytes(UTF_8))
     out.write('\n')
 
-    // write metadata or void
-    out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata)
-      .getBytes(UTF_8))
+    // write metadata
+    out.write(EMPTY_JSON.getBytes(UTF_8))
   }
 }
 
 object BatchCommitLog {
   private val VERSION = 1
-  private val SERIALIZED_VOID = "{}"
+  private val EMPTY_JSON = "{}"
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 60ce64261c..46bfc29793 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -106,6 +106,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
    * metadata has already been stored, this method will return `false`.
    */
   override def add(batchId: Long, metadata: T): Boolean = {
+    require(metadata != null, "'null' metadata cannot written to a metadata log")
     get(batchId).map(_ => false).getOrElse {
       // Only write metadata when the batch has not yet been written
       writeBatch(batchId, metadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 34e9262af7..5f548172f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -305,7 +305,7 @@ class StreamExecution(
               if (dataAvailable) {
                 // Update committed offsets.
                 committedOffsets ++= availableOffsets
-                batchCommitLog.add(currentBatchId, null)
+                batchCommitLog.add(currentBatchId)
                 logDebug(s"batch ${currentBatchId} committed")
                 // We'll increase currentBatchId after we complete processing current batch's data
                 currentBatchId += 1
-- 
GitLab