Skip to content
Snippets Groups Projects
Commit d0212eb0 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-18530][SS][KAFKA] Change Kafka timestamp column type to TimestampType

## What changes were proposed in this pull request?

Changed Kafka timestamp column type to TimestampType.

## How was this patch tested?

`test("Kafka column types")`.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15969 from zsxwing/SPARK-18530.
parent 39a1d306
No related branches found
No related tags found
No related merge requests found
......@@ -32,9 +32,12 @@ import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.UninterruptibleThread
/**
......@@ -282,7 +285,14 @@ private[kafka010] case class KafkaSource(
// Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays.
val rdd = new KafkaSourceRDD(
sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr =>
Row(cr.key, cr.value, cr.topic, cr.partition, cr.offset, cr.timestamp, cr.timestampType.id)
InternalRow(
cr.key,
cr.value,
UTF8String.fromString(cr.topic),
cr.partition,
cr.offset,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
cr.timestampType.id)
}
logInfo("GetBatch generating RDD of offset range: " +
......@@ -293,7 +303,7 @@ private[kafka010] case class KafkaSource(
currentPartitionOffsets = Some(untilPartitionOffsets)
}
sqlContext.createDataFrame(rdd, schema)
sqlContext.internalCreateDataFrame(rdd, schema)
}
/** Stop this source and free any resources it has allocated. */
......@@ -496,7 +506,7 @@ private[kafka010] object KafkaSource {
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
......
......@@ -17,11 +17,11 @@
package org.apache.spark.sql.kafka010
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
......@@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
......@@ -551,6 +552,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
test("Kafka column types") {
val now = System.currentTimeMillis()
val topic = newTopic()
testUtils.createTopic(newTopic(), partitions = 1)
testUtils.sendMessages(topic, Array(1).map(_.toString))
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("startingOffsets", s"earliest")
.option("subscribe", topic)
.load()
val query = kafka
.writeStream
.format("memory")
.outputMode("append")
.queryName("kafkaColumnTypes")
.start()
query.processAllAvailable()
val rows = spark.table("kafkaColumnTypes").collect()
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
val row = rows(0)
assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
// We cannot check the exact timestamp as it's the time that messages were inserted by the
// producer. So here we just use a low bound to make sure the internal conversion works.
assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
query.stop()
}
test("KafkaSource with watermark") {
val now = System.currentTimeMillis()
val topic = newTopic()
testUtils.createTopic(newTopic(), partitions = 1)
testUtils.sendMessages(topic, Array(1).map(_.toString))
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("startingOffsets", s"earliest")
.option("subscribe", topic)
.load()
val windowedAggregation = kafka
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
val query = windowedAggregation
.writeStream
.format("memory")
.outputMode("complete")
.queryName("kafkaWatermark")
.start()
query.processAllAvailable()
val rows = spark.table("kafkaWatermark").collect()
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
val row = rows(0)
// We cannot check the exact window start time as it depands on the time that messages were
// inserted by the producer. So here we just use a low bound to make sure the internal
// conversion works.
assert(
row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
s"Unexpected results: $row")
assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
query.stop()
}
private def testFromLatestOffsets(
topic: String,
addPartitions: Boolean,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment