diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 9e30fa0dbf26a128e5472849535f1137e083a41f..fcf473ac7b76e613f161e2cc2ac3833c6f6bb9df 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -570,10 +570,17 @@ setMethod("unpersist", #' Repartition #' -#' Return a new SparkDataFrame that has exactly numPartitions partitions. -#' +#' The following options for repartition are possible: +#' \itemize{ +#' \item{"Option 1"} {Return a new SparkDataFrame partitioned by +#' the given columns into `numPartitions`.} +#' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.} +#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given column(s), +#' using `spark.sql.shuffle.partitions` as number of partitions.} +#'} #' @param x A SparkDataFrame #' @param numPartitions The number of partitions to use. +#' @param col The column by which the partitioning will be performed. #' #' @family SparkDataFrame functions #' @rdname repartition @@ -586,11 +593,31 @@ setMethod("unpersist", #' path <- "path/to/file.json" #' df <- read.json(sqlContext, path) #' newDF <- repartition(df, 2L) +#' newDF <- repartition(df, numPartitions = 2L) +#' newDF <- repartition(df, col = df$"col1", df$"col2") +#' newDF <- repartition(df, 3L, col = df$"col1", df$"col2") #'} setMethod("repartition", - signature(x = "SparkDataFrame", numPartitions = "numeric"), - function(x, numPartitions) { - sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) + signature(x = "SparkDataFrame"), + function(x, numPartitions = NULL, col = NULL, ...) { + if (!is.null(numPartitions) && is.numeric(numPartitions)) { + # number of partitions and columns both are specified + if (!is.null(col) && class(col) == "Column") { + cols <- list(col, ...) + jcol <- lapply(cols, function(c) { c@jc }) + sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions), jcol) + } else { + # only number of partitions is specified + sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) + } + } else if (!is.null(col) && class(col) == "Column") { + # only columns are specified + cols <- list(col, ...) + jcol <- lapply(cols, function(c) { c@jc }) + sdf <- callJMethod(x@sdf, "repartition", jcol) + } else { + stop("Please, specify the number of partitions and/or a column(s)") + } dataFrame(sdf) }) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 34d29ddbfdd522e8cd83b5811dafba3074657917..f1badf4364da0c67125e23b018fa5b157199da29 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1023,9 +1023,13 @@ setMethod("keyBy", #' @aliases repartition,RDD #' @noRd setMethod("repartition", - signature(x = "RDD", numPartitions = "numeric"), + signature(x = "RDD"), function(x, numPartitions) { - coalesce(x, numPartitions, TRUE) + if (!is.null(numPartitions) && is.numeric(numPartitions)) { + coalesce(x, numPartitions, TRUE) + } else { + stop("Please, specify the number of partitions") + } }) #' Return a new RDD that is reduced into numPartitions partitions. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index f936ea603998189e79905d7013131791fc5ffd63..3db1ac07666b30d2a3d2a19104442e4e388de98c 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -167,7 +167,7 @@ setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) # @rdname repartition # @seealso coalesce # @export -setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") }) +setGeneric("repartition", function(x, ...) { standardGeneric("repartition") }) # @rdname sampleRDD # @export diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 081f7b1663272a70e147379b2a95271afa3f8885..3b6a27c3b86a107d9cd203bcec2a769dae498588 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2082,6 +2082,42 @@ test_that("dapply() on a DataFrame", { expect_identical(expected, result) }) +test_that("repartition by columns on DataFrame", { + df <- createDataFrame ( + sqlContext, + list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), + c("a", "b", "c", "d")) + + # no column and number of partitions specified + retError <- tryCatch(repartition(df), error = function(e) e) + expect_equal(grepl + ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) + + # repartition by column and number of partitions + actual <- repartition(df, 3L, col = df$"a") + + # since we cannot access the number of partitions from dataframe, checking + # that at least the dimensions are identical + expect_identical(dim(df), dim(actual)) + + # repartition by number of partitions + actual <- repartition(df, 13L) + expect_identical(dim(df), dim(actual)) + + # a test case with a column and dapply + schema <- structType(structField("a", "integer"), structField("avg", "double")) + df <- repartition(df, col = df$"a") + df1 <- dapply( + df, + function(x) { + y <- (data.frame(x$a[1], mean(x$b))) + }, + schema) + + # Number of partitions is equal to 2 + expect_equal(nrow(df1), 2) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c77b13832c8b5387febc77cb159679eab7b164bd..dd73fb8dad69518fdba4c80e2a1a344cabf9be9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2178,8 +2178,9 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving - * the existing number of partitions. The resulting Datasetis hash partitioned. + * Returns a new [[Dataset]] partitioned by the given partitioning expressions, using + * `spark.sql.shuffle.partitions` as number of partitions. + * The resulting Dataset is hash partitioned. * * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). *