-
hyukjinkwon authored
## What changes were proposed in this pull request? This PR proposes to add a wrapper for `unionByName` API to R and Python as well. **Python** ```python df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"]) df1.unionByName(df2).show() ``` ``` +----+----+----+ |col0|col1|col3| +----+----+----+ | 1| 2| 3| | 6| 4| 5| +----+----+----+ ``` **R** ```R df1 <- select(createDataFrame(mtcars), "carb", "am", "gear") df2 <- select(createDataFrame(mtcars), "am", "gear", "carb") head(unionByName(limit(df1, 2), limit(df2, 2))) ``` ``` carb am gear 1 4 1 4 2 4 1 4 3 4 1 4 4 4 1 4 ``` ## How was this patch tested? Doctests for Python and unit test added in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #19105 from HyukjinKwon/unionByName-r-python.
hyukjinkwon authored## What changes were proposed in this pull request? This PR proposes to add a wrapper for `unionByName` API to R and Python as well. **Python** ```python df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"]) df1.unionByName(df2).show() ``` ``` +----+----+----+ |col0|col1|col3| +----+----+----+ | 1| 2| 3| | 6| 4| 5| +----+----+----+ ``` **R** ```R df1 <- select(createDataFrame(mtcars), "carb", "am", "gear") df2 <- select(createDataFrame(mtcars), "am", "gear", "carb") head(unionByName(limit(df1, 2), limit(df2, 2))) ``` ``` carb am gear 1 4 1 4 2 4 1 4 3 4 1 4 4 4 1 4 ``` ## How was this patch tested? Doctests for Python and unit test added in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #19105 from HyukjinKwon/unionByName-r-python.
test_sparkSQL.R 126.41 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(testthat)
context("SparkSQL functions")
# Utility function for easily checking the values of a StructField
checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
expect_equal(class(actual), "structField")
expect_equal(actual$name(), expectedName)
expect_equal(actual$dataType.toString(), expectedType)
expect_equal(actual$nullable(), expectedNullable)
}
markUtf8 <- function(s) {
Encoding(s) <- "UTF-8"
s
}
setHiveContext <- function(sc) {
if (exists(".testHiveSession", envir = .sparkREnv)) {
hiveSession <- get(".testHiveSession", envir = .sparkREnv)
} else {
# initialize once and reuse
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
hiveSession <- callJMethod(hiveCtx, "sparkSession")
}
previousSession <- get(".sparkRsession", envir = .sparkREnv)
assign(".sparkRsession", hiveSession, envir = .sparkREnv)
assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
hiveSession
}
unsetHiveContext <- function() {
previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
assign(".sparkRsession", previousSession, envir = .sparkREnv)
remove(".prevSparkRsession", envir = .sparkREnv)
}
# Tests for SparkSQL functions in SparkR
filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkSession <- if (windows_with_hadoop()) {
sparkR.session(master = sparkRTestMaster)
} else {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
}
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)