From cafca54c0ea8bd9c3b80dcbc88d9f2b8d708a026 Mon Sep 17 00:00:00 2001
From: Xiao Li <gatorsmile@gmail.com>
Date: Sat, 6 May 2017 22:21:19 -0700
Subject: [PATCH] [SPARK-20557][SQL] Support JDBC data type Time with Time Zone

### What changes were proposed in this pull request?

This PR is to support JDBC data type TIME WITH TIME ZONE. It can be converted to TIMESTAMP

In addition, before this PR, for unsupported data types, we simply output the type number instead of the type name.

```
java.sql.SQLException: Unsupported type 2014
```
After this PR, the message is like
```
java.sql.SQLException: Unsupported type TIMESTAMP_WITH_TIMEZONE
```

- Also upgrade the H2 version to `1.4.195` which has the type fix for "TIMESTAMP WITH TIMEZONE". However, it is not fully supported. Thus, we capture the exception, but we still need it to partially test the support of "TIMESTAMP WITH TIMEZONE", because Docker tests are not regularly run.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17835 from gatorsmile/h2.
---
 .../sql/jdbc/OracleIntegrationSuite.scala     |  2 +-
 .../sql/jdbc/PostgresIntegrationSuite.scala   | 15 ++++++++++++
 sql/core/pom.xml                              |  2 +-
 .../datasources/jdbc/JdbcUtils.scala          | 12 +++++++---
 .../spark/sql/internal/CatalogImpl.scala      |  1 -
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 +++++++++++++++++--
 6 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 85d4a4a791..f7b1ec34ce 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -192,7 +192,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     checkRow(sql("SELECT * FROM datetime1 where id = 1").head())
   }
 
-  test("SPARK-20557: column type TIMEZONE with TIME STAMP should be recognized") {
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE should be recognized") {
     val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
     val rows = dfRead.collect()
     val types = rows(0).toSeq.map(x => x.getClass.toString)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index a1a065a443..eb3c458360 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -55,6 +55,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
       + "null, null, null, null, null, "
       + "null, null, null, null, null, null, null)"
     ).executeUpdate()
+
+    conn.prepareStatement("CREATE TABLE ts_with_timezone " +
+      "(id integer, tstz TIMESTAMP WITH TIME ZONE, ttz TIME WITH TIME ZONE)")
+      .executeUpdate()
+    conn.prepareStatement("INSERT INTO ts_with_timezone VALUES " +
+      "(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', TIME WITH TIME ZONE '17:22:31.949271+00')")
+      .executeUpdate()
   }
 
   test("Type mapping for various types") {
@@ -126,4 +133,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
     assert(schema(0).dataType == FloatType)
     assert(schema(1).dataType == ShortType)
   }
+
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE should be recognized") {
+    val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
+    val rows = dfRead.collect()
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types(1).equals("class java.sql.Timestamp"))
+    assert(types(2).equals("class java.sql.Timestamp"))
+  }
 }
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index e170133f0f..fe4be963e8 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -115,7 +115,7 @@
     <dependency>
       <groupId>com.h2database</groupId>
       <artifactId>h2</artifactId>
-      <version>1.4.183</version>
+      <version>1.4.195</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fb877d1ca7..71eaab119d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
+import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -217,11 +217,14 @@ object JdbcUtils extends Logging {
       case java.sql.Types.OTHER         => null
       case java.sql.Types.REAL          => DoubleType
       case java.sql.Types.REF           => StringType
+      case java.sql.Types.REF_CURSOR    => null
       case java.sql.Types.ROWID         => LongType
       case java.sql.Types.SMALLINT      => IntegerType
       case java.sql.Types.SQLXML        => StringType
       case java.sql.Types.STRUCT        => StringType
       case java.sql.Types.TIME          => TimestampType
+      case java.sql.Types.TIME_WITH_TIMEZONE
+                                        => TimestampType
       case java.sql.Types.TIMESTAMP     => TimestampType
       case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
                                         => TimestampType
@@ -229,11 +232,14 @@ object JdbcUtils extends Logging {
       case java.sql.Types.TINYINT       => IntegerType
       case java.sql.Types.VARBINARY     => BinaryType
       case java.sql.Types.VARCHAR       => StringType
-      case _                            => null
+      case _                            =>
+        throw new SQLException("Unrecognized SQL type " + sqlType)
       // scalastyle:on
     }
 
-    if (answer == null) throw new SQLException("Unsupported type " + sqlType)
+    if (answer == null) {
+      throw new SQLException("Unsupported type " + JDBCType.valueOf(sqlType).getName)
+    }
     answer
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e1049c665a..142b005850 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
 
 
-
 /**
  * Internal implementation of the user-facing `Catalog`.
  */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5bd36ec25c..d9f3689411 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
-import java.sql.{Date, DriverManager, Timestamp}
+import java.sql.{Date, DriverManager, SQLException, Timestamp}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.DataSourceScanExec
@@ -141,6 +141,15 @@ class JDBCSuite extends SparkFunSuite
         |OPTIONS (url '$url', dbtable 'TEST.TIMETYPES', user 'testUser', password 'testPass')
        """.stripMargin.replaceAll("\n", " "))
 
+    conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " +
+      "AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'")
+      .executeUpdate()
+    conn.commit()
+
+    conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " +
+      "AS SELECT '(1, 2, 3)'")
+      .executeUpdate()
+    conn.commit()
 
     conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c DECIMAL(38, 18))"
         ).executeUpdate()
@@ -919,6 +928,17 @@ class JDBCSuite extends SparkFunSuite
     assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
   }
 
+  test("unsupported types") {
+    var e = intercept[SparkException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect()
+    }.getMessage
+    assert(e.contains("java.lang.UnsupportedOperationException: unimplemented"))
+    e = intercept[SQLException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
+    }.getMessage
+    assert(e.contains("Unsupported type ARRAY"))
+  }
+
   test("SPARK-19318: Connection properties keys should be case-sensitive.") {
     def testJdbcOptions(options: JDBCOptions): Unit = {
       // Spark JDBC data source options are case-insensitive
-- 
GitLab