Skip to content
Snippets Groups Projects
Unverified Commit 07beb5d2 authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Sean Owen
Browse files

[SPARK-18413][SQL] Add `maxConnections` JDBCOption

## What changes were proposed in this pull request?

This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API.

**Reported Scenario**

For the following cases, the number of connections becomes 200 and database cannot handle all of them.

```sql
CREATE OR REPLACE TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
-- set spark.sql.shuffle.partitions=200
INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g
```

## How was this patch tested?

Manual. Do the followings and see Spark UI.

**Step 1 (MySQL)**
```
CREATE TABLE t1 (a INT);
CREATE TABLE data (a INT);
INSERT INTO data VALUES (1);
INSERT INTO data VALUES (2);
INSERT INTO data VALUES (3);
```

**Step 2 (Spark)**
```scala
SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
scala> sql("SET spark.sql.shuffle.partitions=3")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
```

![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15868 from dongjoon-hyun/SPARK-18413.
parent 9f262ae1
No related branches found
No related tags found
No related merge requests found
......@@ -1086,6 +1086,13 @@ the following case-sensitive options:
</td>
</tr>
<tr>
<td><code>maxConnections</code></td>
<td>
The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
</td>
</tr>
<tr>
<td><code>isolationLevel</code></td>
<td>
......
......@@ -122,6 +122,11 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
// the maximum number of connections
val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
require(maxConnections.isEmpty || maxConnections.get > 0,
s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
"The minimum value is 1.")
}
object JDBCOptions {
......@@ -144,4 +149,5 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
}
......@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
val getConnection: () => Connection = createConnectionFactory(options)
val batchSize = options.batchSize
val isolationLevel = options.isolationLevel
df.foreachPartition(iterator => savePartition(
val maxConnections = options.maxConnections
val repartitionedDF =
if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
df.coalesce(maxConnections.get)
} else {
df
}
repartitionedDF.foreachPartition(iterator => savePartition(
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
......
......@@ -312,4 +312,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
.options(properties.asScala)
.save()
}
test("SPARK-18413: Add `maxConnections` JDBCOption") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val e = intercept[IllegalArgumentException] {
df.write.format("jdbc")
.option("dbtable", "TEST.SAVETEST")
.option("url", url1)
.option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
.save()
}.getMessage
assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment