From c40597720e8e66a6b11ca241b1ad387154a8fe72 Mon Sep 17 00:00:00 2001
From: Felix Cheung <felixcheung_m@hotmail.com>
Date: Sun, 19 Mar 2017 22:34:18 -0700
Subject: [PATCH] [SPARK-20020][SPARKR] DataFrame checkpoint API

## What changes were proposed in this pull request?

Add checkpoint, setCheckpointDir API to R

## How was this patch tested?

unit tests, manual tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17351 from felixcheung/rdfcheckpoint.
---
 R/pkg/NAMESPACE                           |  2 ++
 R/pkg/R/DataFrame.R                       | 29 +++++++++++++++++++++++
 R/pkg/R/RDD.R                             |  2 +-
 R/pkg/R/context.R                         | 21 +++++++++++++++-
 R/pkg/R/generics.R                        |  6 ++++-
 R/pkg/inst/tests/testthat/test_rdd.R      |  4 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++++++++
 7 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 78344ce9ff..8be7875ad2 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -82,6 +82,7 @@ exportMethods("arrange",
               "as.data.frame",
               "attach",
               "cache",
+              "checkpoint",
               "coalesce",
               "collect",
               "colnames",
@@ -369,6 +370,7 @@ export("as.DataFrame",
        "read.parquet",
        "read.stream",
        "read.text",
+       "setCheckpointDir",
        "spark.lapply",
        "spark.addFile",
        "spark.getSparkFilesRootDirectory",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index bc81633815..97786df4ae 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3613,3 +3613,32 @@ setMethod("write.stream",
             ssq <- handledCallJMethod(write, "start")
             streamingQuery(ssq)
           })
+
+#' checkpoint
+#'
+#' Returns a checkpointed version of this SparkDataFrame. Checkpointing can be used to truncate the
+#' logical plan, which is especially useful in iterative algorithms where the plan may grow
+#' exponentially. It will be saved to files inside the checkpoint directory set with
+#' \code{setCheckpointDir}
+#'
+#' @param x A SparkDataFrame
+#' @param eager whether to checkpoint this SparkDataFrame immediately
+#' @return a new checkpointed SparkDataFrame
+#' @family SparkDataFrame functions
+#' @aliases checkpoint,SparkDataFrame-method
+#' @rdname checkpoint
+#' @name checkpoint
+#' @seealso \link{setCheckpointDir}
+#' @export
+#' @examples
+#'\dontrun{
+#' setCheckpointDir("/checkpoint")
+#' df <- checkpoint(df)
+#' }
+#' @note checkpoint since 2.2.0
+setMethod("checkpoint",
+          signature(x = "SparkDataFrame"),
+          function(x, eager = TRUE) {
+            df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
+            dataFrame(df)
+          })
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 5667b9d788..7ad3993e9e 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -291,7 +291,7 @@ setMethod("unpersistRDD",
 #' @rdname checkpoint-methods
 #' @aliases checkpoint,RDD-method
 #' @noRd
-setMethod("checkpoint",
+setMethod("checkpointRDD",
           signature(x = "RDD"),
           function(x) {
             jrdd <- getJRDD(x)
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1a0dd65f45..cb0f83b2fa 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -291,7 +291,7 @@ broadcast <- function(sc, object) {
 #' rdd <- parallelize(sc, 1:2, 2L)
 #' checkpoint(rdd)
 #'}
-setCheckpointDir <- function(sc, dirName) {
+setCheckpointDirSC <- function(sc, dirName) {
   invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
 }
 
@@ -410,3 +410,22 @@ setLogLevel <- function(level) {
   sc <- getSparkContext()
   invisible(callJMethod(sc, "setLogLevel", level))
 }
+
+#' Set checkpoint directory
+#'
+#' Set the directory under which SparkDataFrame are going to be checkpointed. The directory must be
+#' a HDFS path if running on a cluster.
+#'
+#' @rdname setCheckpointDir
+#' @param directory Directory path to checkpoint to
+#' @seealso \link{checkpoint}
+#' @export
+#' @examples
+#'\dontrun{
+#' setCheckpointDir("/checkpoint")
+#'}
+#' @note setCheckpointDir since 2.0.0
+setCheckpointDir <- function(directory) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(directory))))
+}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 029771289f..80283e48ce 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -32,7 +32,7 @@ setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coa
 
 # @rdname checkpoint-methods
 # @export
-setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
+setGeneric("checkpointRDD", function(x) { standardGeneric("checkpointRDD") })
 
 setGeneric("collectRDD", function(x, ...) { standardGeneric("collectRDD") })
 
@@ -406,6 +406,10 @@ setGeneric("attach")
 #' @export
 setGeneric("cache", function(x) { standardGeneric("cache") })
 
+#' @rdname checkpoint
+#' @export
+setGeneric("checkpoint", function(x, eager = TRUE) { standardGeneric("checkpoint") })
+
 #' @rdname coalesce
 #' @param x a Column or a SparkDataFrame.
 #' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index 787ef51c50..b72c801dd9 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -143,8 +143,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
   expect_false(rdd2@env$isCached)
 
   tempDir <- tempfile(pattern = "checkpoint")
-  setCheckpointDir(sc, tempDir)
-  checkpoint(rdd2)
+  setCheckpointDirSC(sc, tempDir)
+  checkpointRDD(rdd2)
   expect_true(rdd2@env$isCheckpointed)
 
   rdd2 <- lapply(rdd2, function(x) x)
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9c38e0d866..cbc3569795 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -841,6 +841,17 @@ test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame",
   expect_true(is.data.frame(collect(df)))
 })
 
+test_that("setCheckpointDir(), checkpoint() on a DataFrame", {
+  checkpointDir <- file.path(tempdir(), "cproot")
+  expect_true(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
+
+  setCheckpointDir(checkpointDir)
+  df <- read.json(jsonPath)
+  df <- checkpoint(df)
+  expect_is(df, "SparkDataFrame")
+  expect_false(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
+})
+
 test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
   df <- read.json(jsonPath)
   testSchema <- schema(df)
-- 
GitLab