Skip to content
Snippets Groups Projects
  • hyukjinkwon's avatar
    07fd68a2
    [SPARK-21897][PYTHON][R] Add unionByName API to DataFrame in Python and R · 07fd68a2
    hyukjinkwon authored
    ## What changes were proposed in this pull request?
    
    This PR proposes to add a wrapper for `unionByName` API to R and Python as well.
    
    **Python**
    
    ```python
    df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
    df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
    df1.unionByName(df2).show()
    ```
    
    ```
    +----+----+----+
    |col0|col1|col3|
    +----+----+----+
    |   1|   2|   3|
    |   6|   4|   5|
    +----+----+----+
    ```
    
    **R**
    
    ```R
    df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
    df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
    head(unionByName(limit(df1, 2), limit(df2, 2)))
    ```
    
    ```
      carb am gear
    1    4  1    4
    2    4  1    4
    3    4  1    4
    4    4  1    4
    ```
    
    ## How was this patch tested?
    
    Doctests for Python and unit test added in `test_sparkSQL.R` for R.
    
    Author: hyukjinkwon <gurwls223@gmail.com>
    
    Closes #19105 from HyukjinKwon/unionByName-r-python.
    07fd68a2
    History
    [SPARK-21897][PYTHON][R] Add unionByName API to DataFrame in Python and R
    hyukjinkwon authored
    ## What changes were proposed in this pull request?
    
    This PR proposes to add a wrapper for `unionByName` API to R and Python as well.
    
    **Python**
    
    ```python
    df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
    df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
    df1.unionByName(df2).show()
    ```
    
    ```
    +----+----+----+
    |col0|col1|col3|
    +----+----+----+
    |   1|   2|   3|
    |   6|   4|   5|
    +----+----+----+
    ```
    
    **R**
    
    ```R
    df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
    df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
    head(unionByName(limit(df1, 2), limit(df2, 2)))
    ```
    
    ```
      carb am gear
    1    4  1    4
    2    4  1    4
    3    4  1    4
    4    4  1    4
    ```
    
    ## How was this patch tested?
    
    Doctests for Python and unit test added in `test_sparkSQL.R` for R.
    
    Author: hyukjinkwon <gurwls223@gmail.com>
    
    Closes #19105 from HyukjinKwon/unionByName-r-python.
test_sparkSQL.R 126.41 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

library(testthat)

context("SparkSQL functions")

# Utility function for easily checking the values of a StructField
checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
  expect_equal(class(actual), "structField")
  expect_equal(actual$name(), expectedName)
  expect_equal(actual$dataType.toString(), expectedType)
  expect_equal(actual$nullable(), expectedNullable)
}

markUtf8 <- function(s) {
  Encoding(s) <- "UTF-8"
  s
}

setHiveContext <- function(sc) {
  if (exists(".testHiveSession", envir = .sparkREnv)) {
    hiveSession <- get(".testHiveSession", envir = .sparkREnv)
  } else {
    # initialize once and reuse
    ssc <- callJMethod(sc, "sc")
    hiveCtx <- tryCatch({
      newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
    },
    error = function(err) {
      skip("Hive is not build with SparkSQL, skipped")
    })
    hiveSession <- callJMethod(hiveCtx, "sparkSession")
  }
  previousSession <- get(".sparkRsession", envir = .sparkREnv)
  assign(".sparkRsession", hiveSession, envir = .sparkREnv)
  assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
  hiveSession
}

unsetHiveContext <- function() {
  previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
  assign(".sparkRsession", previousSession, envir = .sparkREnv)
  remove(".prevSparkRsession", envir = .sparkREnv)
}

# Tests for SparkSQL functions in SparkR

filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkSession <- if (windows_with_hadoop()) {
    sparkR.session(master = sparkRTestMaster)
  } else {
    sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
  }
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)