diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 1764aa9465c4366700af705b105872e8fc301ed6..17fd7d781c9ab3ae2588536a10529eb025371173 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -34,6 +34,13 @@ <sbt.project.name>docker-integration-tests</sbt.project.name> </properties> + <repositories> + <repository> + <id>db2</id> + <url>https://app.camunda.com/nexus/content/repositories/public/</url> + </repository> + </repositories> + <dependencies> <dependency> <groupId>com.spotify</groupId> @@ -180,5 +187,28 @@ </exclusions> </dependency> <!-- End Jersey dependencies --> + + <!-- DB2 JCC driver manual installation instructions + + You can build this datasource if you: + 1) have the DB2 artifacts installed in a local repo and supply the URL: + -Dmaven.repo.drivers=http://my.local.repo + + 2) have a copy of the DB2 JCC driver and run the following commands : + mvn install:install-file -Dfile=${path to db2jcc4.jar} \ + -DgroupId=com.ibm.db2 \ + -DartifactId=db2jcc4 \ + -Dversion=10.5 \ + -Dpackaging=jar + + Note: IBM DB2 JCC driver is available for download at + http://www-01.ibm.com/support/docview.wss?uid=swg21363866 + --> + <dependency> + <groupId>com.ibm.db2.jcc</groupId> + <artifactId>db2jcc4</artifactId> + <version>10.5.0.5</version> + <type>jar</type> + </dependency> </dependencies> </project> diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..4fe1ef66972068b4162d4234bd4a68a86fa50d12 --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.math.BigDecimal +import java.sql.{Connection, Date, Timestamp} +import java.util.Properties + +import org.scalatest._ + +import org.apache.spark.tags.DockerTest + +@DockerTest +@Ignore // AMPLab Jenkins needs to be updated before shared memory works on docker +class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { + override val db = new DatabaseOnDocker { + override val imageName = "lresende/db2express-c:10.5.0.5-3.10.0" + override val env = Map( + "DB2INST1_PASSWORD" -> "rootpass", + "LICENSE" -> "accept" + ) + override val usesIpc = true + override val jdbcPort: Int = 50000 + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:db2://$ip:$port/foo:user=db2inst1;password=rootpass;" + override def getStartupProcessName: Option[String] = Some("db2start") + } + + override def dataPreparation(conn: Connection): Unit = { + conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y VARCHAR(8))").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate() + + conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), " + + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, " + + "dbl DOUBLE)").executeUpdate() + conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', " + + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, " + + "42.75, 1.0000000000000002)").executeUpdate() + + conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, " + + "yr YEAR)").executeUpdate() + conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', " + + "'1996-01-01 01:23:45', '2009-02-13 23:31:30', '2001')").executeUpdate() + + // TODO: Test locale conversion for strings. + conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c CLOB, d BLOB, " + + "e CHAR FOR BIT DATA)").executeUpdate() + conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', 'jumps'") + .executeUpdate() + } + + test("Basic test") { + val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties) + val rows = df.collect() + assert(rows.length == 2) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 2) + assert(types(0).equals("class java.lang.Integer")) + assert(types(1).equals("class java.lang.String")) + } + + test("Numeric types") { + val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 9) + assert(types(0).equals("class java.lang.Boolean")) + assert(types(1).equals("class java.lang.Long")) + assert(types(2).equals("class java.lang.Integer")) + assert(types(3).equals("class java.lang.Integer")) + assert(types(4).equals("class java.lang.Integer")) + assert(types(5).equals("class java.lang.Long")) + assert(types(6).equals("class java.math.BigDecimal")) + assert(types(7).equals("class java.lang.Double")) + assert(types(8).equals("class java.lang.Double")) + assert(rows(0).getBoolean(0) == false) + assert(rows(0).getLong(1) == 0x225) + assert(rows(0).getInt(2) == 17) + assert(rows(0).getInt(3) == 77777) + assert(rows(0).getInt(4) == 123456789) + assert(rows(0).getLong(5) == 123456789012345L) + val bd = new BigDecimal("123456789012345.12345678901234500000") + assert(rows(0).getAs[BigDecimal](6).equals(bd)) + assert(rows(0).getDouble(7) == 42.75) + assert(rows(0).getDouble(8) == 1.0000000000000002) + } + + test("Date types") { + val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 5) + assert(types(0).equals("class java.sql.Date")) + assert(types(1).equals("class java.sql.Timestamp")) + assert(types(2).equals("class java.sql.Timestamp")) + assert(types(3).equals("class java.sql.Timestamp")) + assert(types(4).equals("class java.sql.Date")) + assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09"))) + assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24"))) + assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45"))) + assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30"))) + assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01"))) + } + + test("String types") { + val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 9) + assert(types(0).equals("class java.lang.String")) + assert(types(1).equals("class java.lang.String")) + assert(types(2).equals("class java.lang.String")) + assert(types(3).equals("class java.lang.String")) + assert(types(4).equals("class java.lang.String")) + assert(types(5).equals("class java.lang.String")) + assert(types(6).equals("class [B")) + assert(types(7).equals("class [B")) + assert(types(8).equals("class [B")) + assert(rows(0).getString(0).equals("the")) + assert(rows(0).getString(1).equals("quick")) + assert(rows(0).getString(2).equals("brown")) + assert(rows(0).getString(3).equals("fox")) + assert(rows(0).getString(4).equals("jumps")) + assert(rows(0).getString(5).equals("over")) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0))) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121))) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103))) + } + + test("Basic write test") { + val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties) + val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties) + val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties) + df1.write.jdbc(jdbcUrl, "numberscopy", new Properties) + df2.write.jdbc(jdbcUrl, "datescopy", new Properties) + df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) + } +} diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index f73231fc80a08c64af990f667b0f8a20fb4b6063..c36f4d5f95482048f41ae23f04db1f6f88f988a0 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -44,6 +44,11 @@ abstract class DatabaseOnDocker { */ val env: Map[String, String] + /** + * Wheather or not to use ipc mode for shared memory when starting docker image + */ + val usesIpc: Boolean + /** * The container-internal JDBC port that the database listens on. */ @@ -53,6 +58,11 @@ abstract class DatabaseOnDocker { * Return a JDBC URL that connects to the database running at the given IP address and port. */ def getJdbcUrl(ip: String, port: Int): String + + /** + * Optional process to run when container starts + */ + def getStartupProcessName: Option[String] } abstract class DockerJDBCIntegrationSuite @@ -97,17 +107,23 @@ abstract class DockerJDBCIntegrationSuite val dockerIp = DockerUtils.getDockerIp() val hostConfig: HostConfig = HostConfig.builder() .networkMode("bridge") + .ipcMode(if (db.usesIpc) "host" else "") .portBindings( Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava) .build() // Create the database container: - val config = ContainerConfig.builder() + val containerConfigBuilder = ContainerConfig.builder() .image(db.imageName) .networkDisabled(false) .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) .hostConfig(hostConfig) .exposedPorts(s"${db.jdbcPort}/tcp") - .build() + if(db.getStartupProcessName.isDefined) { + containerConfigBuilder + .cmd(db.getStartupProcessName.get) + } + val config = containerConfigBuilder.build() + // Create the database container: containerId = docker.createContainer(config).id // Start the container and wait until the database can accept JDBC connections: docker.startContainer(containerId) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index c68e4dc4933b1158c77c3edb7ca99ccad77e192a..a70ed98b52d5d5c8127ef3ff00747bbc1e2e3ad7 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -30,9 +30,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { override val env = Map( "MYSQL_ROOT_PASSWORD" -> "rootpass" ) + override val usesIpc = false override val jdbcPort: Int = 3306 override def getJdbcUrl(ip: String, port: Int): String = s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass" + override def getStartupProcessName: Option[String] = None } override def dataPreparation(conn: Connection): Unit = { diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 8a0f938f7e3b30d1ccd41faebd560b63bd6049d8..2fc174eb1b3a1c11333569acbc57c273a9872a3c 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -52,9 +52,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo override val env = Map( "ORACLE_ROOT_PASSWORD" -> "oracle" ) + override val usesIpc = false override val jdbcPort: Int = 1521 override def getJdbcUrl(ip: String, port: Int): String = s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe" + override def getStartupProcessName: Option[String] = None } override def dataPreparation(conn: Connection): Unit = { diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index d55cdcf28b23836e3af27c198b9211b7caaa484d..79dd70116ecb85f67a11f7d6919861242a527a5d 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -32,9 +32,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { override val env = Map( "POSTGRES_PASSWORD" -> "rootpass" ) + override val usesIpc = false override val jdbcPort = 5432 override def getJdbcUrl(ip: String, port: Int): String = s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass" + override def getStartupProcessName: Option[String] = None } override def dataPreparation(conn: Connection): Unit = { diff --git a/pom.xml b/pom.xml index 38843b4f74ba8ec25fd362fa185721f83fad890a..4585c8b9c2b0bbf83c06894405ff4c2d20d9ee64 100644 --- a/pom.xml +++ b/pom.xml @@ -666,7 +666,7 @@ <groupId>com.spotify</groupId> <artifactId>docker-client</artifactId> <classifier>shaded</classifier> - <version>3.4.0</version> + <version>3.6.6</version> <scope>test</scope> <exclusions> <exclusion> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c5688ecec65d463e3e98d02ea406f5971d6b7d5f..a58dd7e7f125cb78d64bc0f9bae1c0f4715c21c3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -366,8 +366,10 @@ object Flume { object DockerIntegrationTests { // This serves to override the override specified in DependencyOverrides: lazy val settings = Seq( - dependencyOverrides += "com.google.guava" % "guava" % "18.0" + dependencyOverrides += "com.google.guava" % "guava" % "18.0", + resolvers ++= Seq("DB2" at "https://app.camunda.com/nexus/content/repositories/public/") ) + } /**