Skip to content
Snippets Groups Projects
sparkr-vignettes.Rmd 44.39 KiB
title: "SparkR - Practical Guide"
output:
  rmarkdown::html_vignette:
    toc: true
    toc_depth: 4
vignette: >
  %\VignetteIndexEntry{SparkR - Practical Guide}
  %\VignetteEngine{knitr::rmarkdown}
  \usepackage[utf8]{inputenc}

Overview

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark r packageVersion("SparkR"), SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using MLlib.

Getting Started

We begin with an example running on the local machine and provide an overview of the use of SparkR: data ingestion, data processing and machine learning.

First, let's load and attach the package.

library(SparkR)

SparkSession is the entry point into SparkR which connects your R program to a Spark cluster. You can create a SparkSession using sparkR.session and pass in options such as the application name, any Spark packages depended on, etc.

We use default settings in which it runs in local mode. It auto downloads Spark package in the background if no previous installation is found. For more details about setup, see Spark Session.

install.spark()
sparkR.session()

The operations in SparkR are centered around an R class called SparkDataFrame. It is a distributed collection of data organized into named columns, which is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood.

SparkDataFrame can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. For example, we create a SparkDataFrame from a local R data frame,

cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

We can view the first few rows of the SparkDataFrame by head or showDF function.

head(carsDF)

Common data processing operations such as filter, select are supported on the SparkDataFrame.

carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
head(carsSubDF)

SparkR can use many common aggregation functions after grouping.

carsGPDF <- summarize(groupBy(carsDF, carsDF$gear), count = n(carsDF$gear))
head(carsGPDF)

The results carsDF and carsSubDF are SparkDataFrame objects. To convert back to R data.frame, we can use collect. Caution: This can cause your interactive environment to run out of memory, though, because collect() fetches the entire distributed DataFrame to your client, which is acting as a Spark driver.

carsGP <- collect(carsGPDF)
class(carsGP)

SparkR supports a number of commonly used machine learning algorithms. Under the hood, SparkR uses MLlib to train the model. Users can call summary to print a summary of the fitted model, predict to make predictions on new data, and write.ml/read.ml to save/load fitted models.

SparkR supports a subset of R formula operators for model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘. We use linear regression as an example.

model <- spark.glm(carsDF, mpg ~ wt + cyl)

The result matches that returned by R glm function applied to the corresponding data.frame mtcars of carsDF. In fact, for Generalized Linear Model, we specifically expose glm for SparkDataFrame as well so that the above is equivalent to model <- glm(mpg ~ wt + cyl, data = carsDF).

summary(model)

The model can be saved by write.ml and loaded back using read.ml.

write.ml(model, path = "/HOME/tmp/mlModel/glmModel")

In the end, we can stop Spark Session by running

sparkR.session.stop()

Setup

Installation

Different from many other R packages, to use SparkR, you need an additional installation of Apache Spark. The Spark installation will be used to run a backend process that will compile and execute SparkR programs.

After installing the SparkR package, you can call sparkR.session as explained in the previous section to start and it will check for the Spark installation. If you are working with SparkR from an interactive shell (eg. R, RStudio) then Spark is downloaded and cached automatically if it is not found. Alternatively, we provide an easy-to-use function install.spark for running this manually. If you don't have Spark installed on the computer, you may download it from Apache Spark Website.

install.spark()

If you already have Spark installed, you don't have to install again and can pass the sparkHome argument to sparkR.session to let SparkR know where the existing Spark installation is.

sparkR.session(sparkHome = "/HOME/spark")

Spark Session {#SetupSparkSession}

In addition to sparkHome, many other options can be specified in sparkR.session. For a complete list, see Starting up: SparkSession and SparkR API doc.

In particular, the following Spark driver properties can be set in sparkConfig.

Property Name Property group spark-submit equivalent
spark.driver.memory Application Properties --driver-memory
spark.driver.extraClassPath Runtime Environment --driver-class-path
spark.driver.extraJavaOptions Runtime Environment --driver-java-options
spark.driver.extraLibraryPath Runtime Environment --driver-library-path
spark.yarn.keytab Application Properties --keytab
spark.yarn.principal Application Properties --principal

For Windows users: Due to different file prefixes across operating systems, to avoid the issue of potential wrong prefix, a current workaround is to specify spark.sql.warehouse.dir when starting the SparkSession.

spark_warehouse_path <- file.path(path.expand('~'), "spark-warehouse")
sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)

Cluster Mode

SparkR can connect to remote Spark clusters. Cluster Mode Overview is a good introduction to different Spark cluster modes.

When connecting SparkR to a remote Spark cluster, make sure that the Spark version and Hadoop version on the machine match the corresponding versions on the cluster. Current SparkR package is compatible with

paste("Spark", packageVersion("SparkR"))

It should be used both on the local computer and on the remote cluster.

To connect, pass the URL of the master node to sparkR.session. A complete list can be seen in Spark Master URLs. For example, to connect to a local standalone Spark master, we can call

sparkR.session(master = "spark://local:7077")

For YARN cluster, SparkR supports the client mode with the master set as "yarn".

sparkR.session(master = "yarn")

Yarn cluster mode is not supported in the current version.

Data Import

Local Data Frame

The simplest way is to convert a local R data frame into a SparkDataFrame. Specifically we can use as.DataFrame or createDataFrame and pass in the local R data frame to create a SparkDataFrame. As an example, the following creates a SparkDataFrame based using the faithful dataset from R.

df <- as.DataFrame(faithful)
head(df)

Data Sources

SparkR supports operating on a variety of data sources through the SparkDataFrame interface. You can check the Spark SQL programming guide for more specific options that are available for the built-in data sources.