Skip to content
Snippets Groups Projects
  • Dongjoon Hyun's avatar
    07beb5d2
    [SPARK-18413][SQL] Add `maxConnections` JDBCOption · 07beb5d2
    Dongjoon Hyun authored
    ## What changes were proposed in this pull request?
    
    This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API.
    
    **Reported Scenario**
    
    For the following cases, the number of connections becomes 200 and database cannot handle all of them.
    
    ```sql
    CREATE OR REPLACE TEMPORARY VIEW resultview
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url "jdbc:oracle:thin:10.129.10.111:1521:BKDB",
      dbtable "result",
      user "HIVE",
      password "HIVE"
    );
    -- set spark.sql.shuffle.partitions=200
    INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g
    ```
    
    ## How was this patch tested?
    
    Manual. Do the followings and see Spark UI.
    
    **Step 1 (MySQL)**
    ```
    CREATE TABLE t1 (a INT);
    CREATE TABLE data (a INT);
    INSERT INTO data VALUES (1);
    INSERT INTO data VALUES (2);
    INSERT INTO data VALUES (3);
    ```
    
    **Step 2 (Spark)**
    ```scala
    SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
    scala> sql("SET spark.sql.shuffle.partitions=3")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    ```
    
    ![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png)
    
    Author: Dongjoon Hyun <dongjoon@apache.org>
    
    Closes #15868 from dongjoon-hyun/SPARK-18413.
    [SPARK-18413][SQL] Add `maxConnections` JDBCOption
    Dongjoon Hyun authored
    ## What changes were proposed in this pull request?
    
    This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API.
    
    **Reported Scenario**
    
    For the following cases, the number of connections becomes 200 and database cannot handle all of them.
    
    ```sql
    CREATE OR REPLACE TEMPORARY VIEW resultview
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url "jdbc:oracle:thin:10.129.10.111:1521:BKDB",
      dbtable "result",
      user "HIVE",
      password "HIVE"
    );
    -- set spark.sql.shuffle.partitions=200
    INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g
    ```
    
    ## How was this patch tested?
    
    Manual. Do the followings and see Spark UI.
    
    **Step 1 (MySQL)**
    ```
    CREATE TABLE t1 (a INT);
    CREATE TABLE data (a INT);
    INSERT INTO data VALUES (1);
    INSERT INTO data VALUES (2);
    INSERT INTO data VALUES (3);
    ```
    
    **Step 2 (Spark)**
    ```scala
    SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
    scala> sql("SET spark.sql.shuffle.partitions=3")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
    scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
    ```
    
    ![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png)
    
    Author: Dongjoon Hyun <dongjoon@apache.org>
    
    Closes #15868 from dongjoon-hyun/SPARK-18413.