From 0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626 Mon Sep 17 00:00:00 2001
From: jerluc <jeremyalucas@gmail.com>
Date: Mon, 18 May 2015 18:13:29 -0700
Subject: [PATCH] [SPARK-7621] [STREAMING] Report Kafka errors to
 StreamingListeners

PR per [SPARK-7621](https://issues.apache.org/jira/browse/SPARK-7621), which makes both `KafkaReceiver` and `ReliableKafkaReceiver` report its errors to the `ReceiverTracker`, which in turn will add the events to the bus to fire off any registered `StreamingListener`s.

Author: jerluc <jeremyalucas@gmail.com>

Closes #6204 from jerluc/master and squashes the following commits:

82439a5 [jerluc] [SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners
---
 .../org/apache/spark/streaming/kafka/KafkaInputDStream.scala    | 2 +-
 .../apache/spark/streaming/kafka/ReliableKafkaReceiver.scala    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index cca0fac023..04b2dc10d3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -135,7 +135,7 @@ class KafkaReceiver[
           store((msgAndMetadata.key, msgAndMetadata.message))
         }
       } catch {
-        case e: Throwable => logError("Error handling message; exiting", e)
+        case e: Throwable => reportError("Error handling message; exiting", e)
       }
     }
   }
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index ea87e96037..75f0dfc22b 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -267,7 +267,7 @@ class ReliableKafkaReceiver[
           }
         } catch {
           case e: Exception =>
-            logError("Error handling message", e)
+            reportError("Error handling message", e)
         }
       }
     }
-- 
GitLab