-
Wenchen Fan authored
## What changes were proposed in this pull request? After Spark 2.0, `SparkSession` becomes the new entry point for Spark applications. We should update the public documents to reflect this. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16856 from cloud-fan/doc.
Wenchen Fan authored## What changes were proposed in this pull request? After Spark 2.0, `SparkSession` becomes the new entry point for Spark applications. We should update the public documents to reflect this. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16856 from cloud-fan/doc.
layout: global
title: Spark Programming Guide
description: Spark SPARK_VERSION_SHORT programming guide in Java, Scala and Python
- This will become a table of contents (this text will be scraped). {:toc}
Overview
At a high level, every Spark application consists of a driver program that runs the user's main
function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only "added" to, such as counters and sums.
This guide shows each of these features in each of Spark's supported languages. It is easiest to follow
along with if you launch Spark's interactive shell -- either bin/spark-shell
for the Scala shell or
bin/pyspark
for the Python one.
Linking with Spark
Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}} by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X).
To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:
groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION}}
In addition, if you wish to access an HDFS cluster, you need to add a dependency on
hadoop-client
for your version of HDFS.
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
Finally, you need to import some Spark classes into your program. Add the following lines:
{% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.SparkConf {% endhighlight %}
(Before Spark 1.3.0, you need to explicitly import org.apache.spark.SparkContext._
to enable essential implicit conversions.)
Spark {{site.SPARK_VERSION}} supports lambda expressions for concisely writing functions, otherwise you can use the classes in the org.apache.spark.api.java.function package.
Note that support for Java 7 was removed in Spark 2.2.0.
To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION}}
In addition, if you wish to access an HDFS cluster, you need to add a dependency on
hadoop-client
for your version of HDFS.
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
Finally, you need to import some Spark classes into your program. Add the following lines:
{% highlight java %} import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; {% endhighlight %}
Spark {{site.SPARK_VERSION}} works with Python 2.6+ or Python 3.4+. It can use the standard CPython interpreter, so C libraries like NumPy can be used. It also works with PyPy 2.3+.
Note that support for Python 2.6 is deprecated as of Spark 2.0.0, and may be removed in Spark 2.2.0.
To run Spark applications in Python, use the bin/spark-submit
script located in the Spark directory.
This script will load Spark's Java/Scala libraries and allow you to submit applications to a cluster.
You can also use bin/pyspark
to launch an interactive Python shell.
If you wish to access HDFS data, you need to use a build of PySpark linking to your version of HDFS. Prebuilt packages are also available on the Spark homepage for common HDFS versions.
Finally, you need to import some Spark classes into your program. Add the following line:
{% highlight python %} from pyspark import SparkContext, SparkConf {% endhighlight %}
PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH,
you can specify which version of Python you want to use by PYSPARK_PYTHON
, for example:
{% highlight bash %} $ PYSPARK_PYTHON=python3.4 bin/pyspark $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py {% endhighlight %}
Initializing Spark
The first thing a Spark program must do is to create a SparkContext object, which tells Spark
how to access a cluster. To create a SparkContext
you first need to build a SparkConf object
that contains information about your application.
Only one SparkContext may be active per JVM. You must stop()
the active SparkContext before creating a new one.
{% highlight scala %} val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf) {% endhighlight %}
The first thing a Spark program must do is to create a JavaSparkContext object, which tells Spark
how to access a cluster. To create a SparkContext
you first need to build a SparkConf object
that contains information about your application.
{% highlight java %} SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); {% endhighlight %}
The first thing a Spark program must do is to create a SparkContext object, which tells Spark
how to access a cluster. To create a SparkContext
you first need to build a SparkConf object
that contains information about your application.
{% highlight python %} conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) {% endhighlight %}
The appName
parameter is a name for your application to show on the cluster UI.
master
is a Spark, Mesos or YARN cluster URL,
or a special "local" string to run in local mode.
In practice, when running on a cluster, you will not want to hardcode master
in the program,
but rather launch the application with spark-submit
and
receive it there. However, for local testing and unit tests, you can pass "local" to run Spark
in-process.
Using the Shell
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called sc
. Making your own SparkContext will not work. You can set which master the
context connects to using the --master
argument, and you can add JARs to the classpath
by passing a comma-separated list to the --jars
argument. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates
to the --packages
argument. Any additional repositories where dependencies might exist (e.g. Sonatype)
can be passed to the --repositories
argument. For example, to run bin/spark-shell
on exactly
four cores, use:
{% highlight bash %} $ ./bin/spark-shell --master local[4] {% endhighlight %}
Or, to also add code.jar
to its classpath, use:
{% highlight bash %} $ ./bin/spark-shell --master local[4] --jars code.jar {% endhighlight %}
To include a dependency using Maven coordinates:
{% highlight bash %} $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" {% endhighlight %}
For a complete list of options, run spark-shell --help
. Behind the scenes,
spark-shell
invokes the more general spark-submit
script.
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called sc
. Making your own SparkContext will not work. You can set which master the
context connects to using the --master
argument, and you can add Python .zip, .egg or .py files
to the runtime path by passing a comma-separated list to --py-files
. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates
to the --packages
argument. Any additional repositories where dependencies might exist (e.g. Sonatype)
can be passed to the --repositories
argument. Any Python dependencies a Spark package has (listed in
the requirements.txt of that package) must be manually installed using pip
when necessary.
For example, to run bin/pyspark
on exactly four cores, use:
{% highlight bash %} $ ./bin/pyspark --master local[4] {% endhighlight %}
Or, to also add code.py
to the search path (in order to later be able to import code
), use:
{% highlight bash %} $ ./bin/pyspark --master local[4] --py-files code.py {% endhighlight %}
For a complete list of options, run pyspark --help
. Behind the scenes,
pyspark
invokes the more general spark-submit
script.
It is also possible to launch the PySpark shell in IPython, the
enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
use IPython, set the PYSPARK_DRIVER_PYTHON
variable to ipython
when running bin/pyspark
:
{% highlight bash %} $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark {% endhighlight %}
To use the Jupyter notebook (previously known as the IPython notebook),
{% highlight bash %} $ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark {% endhighlight %}
You can customize the ipython
or jupyter
commands by setting PYSPARK_DRIVER_PYTHON_OPTS
.
After the Jupyter Notebook server is launched, you can create a new "Python 2" notebook from
the "Files" tab. Inside the notebook, you can input the command %pylab inline
as part of
your notebook before you start to try Spark from the Jupyter notebook.
Resilient Distributed Datasets (RDDs)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
Parallelized Collections
Parallelized collections are created by calling SparkContext
's parallelize
method on an existing collection in your driver program (a Scala Seq
). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
{% highlight scala %} val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) {% endhighlight %}
Once created, the distributed dataset (distData
) can be operated on in parallel. For example, we might call distData.reduce((a, b) => a + b)
to add up the elements of the array. We describe operations on distributed datasets later on.
Parallelized collections are created by calling JavaSparkContext
's parallelize
method on an existing Collection
in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
{% highlight java %} List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD distData = sc.parallelize(data); {% endhighlight %}
Once created, the distributed dataset (distData
) can be operated on in parallel. For example, we might call distData.reduce((a, b) -> a + b)
to add up the elements of the list.
We describe operations on distributed datasets later on.
Parallelized collections are created by calling SparkContext
's parallelize
method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
{% highlight python %} data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) {% endhighlight %}
Once created, the distributed dataset (distData
) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b)
to add up the elements of the list.
We describe operations on distributed datasets later on.
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize
(e.g. sc.parallelize(data, 10)
). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
External Datasets
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext
's textFile
method. This method takes an URI for the file (either a local path on the machine, or a hdfs://
, s3n://
, etc URI) and reads it as a collection of lines. Here is an example invocation:
{% highlight scala %} scala> val distFile = sc.textFile("data.txt") distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at :26 {% endhighlight %}
Once created, distFile
can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map
and reduce
operations as follows: distFile.map(s => s.length).reduce((a, b) => a + b)
.
Some notes on reading files with Spark:
-
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
-
All of Spark's file-based input methods, including
textFile
, support running on directories, compressed files, and wildcards as well. For example, you can usetextFile("/my/directory")
,textFile("/my/directory/*.txt")
, andtextFile("/my/directory/*.gz")
. -
The
textFile
method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
Apart from text files, Spark's Scala API also supports several other data formats:
-
SparkContext.wholeTextFiles
lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast withtextFile
, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases,wholeTextFiles
provides an optional second argument for controlling the minimal number of partitions. -
For SequenceFiles, use SparkContext's
sequenceFile[K, V]
method whereK
andV
are the types of key and values in the file. These should be subclasses of Hadoop's Writable interface, like IntWritable and Text. In addition, Spark allows you to specify native types for a few common Writables; for example,sequenceFile[Int, String]
will automatically read IntWritables and Texts. -
For other Hadoop InputFormats, you can use the
SparkContext.hadoopRDD
method, which takes an arbitraryJobConf
and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also useSparkContext.newAPIHadoopRDD
for InputFormats based on the "new" MapReduce API (org.apache.hadoop.mapreduce
). -
RDD.saveAsObjectFile
andSparkContext.objectFile
support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext
's textFile
method. This method takes an URI for the file (either a local path on the machine, or a hdfs://
, s3n://
, etc URI) and reads it as a collection of lines. Here is an example invocation:
{% highlight java %} JavaRDD distFile = sc.textFile("data.txt"); {% endhighlight %}
Once created, distFile
can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map
and reduce
operations as follows: distFile.map(s -> s.length()).reduce((a, b) -> a + b)
.
Some notes on reading files with Spark:
-
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
-
All of Spark's file-based input methods, including
textFile
, support running on directories, compressed files, and wildcards as well. For example, you can usetextFile("/my/directory")
,textFile("/my/directory/*.txt")
, andtextFile("/my/directory/*.gz")
. -
The
textFile
method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
Apart from text files, Spark's Java API also supports several other data formats:
-
JavaSparkContext.wholeTextFiles
lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast withtextFile
, which would return one record per line in each file. -
For SequenceFiles, use SparkContext's
sequenceFile[K, V]
method whereK
andV
are the types of key and values in the file. These should be subclasses of Hadoop's Writable interface, like IntWritable and Text. -
For other Hadoop InputFormats, you can use the
JavaSparkContext.hadoopRDD
method, which takes an arbitraryJobConf
and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also useJavaSparkContext.newAPIHadoopRDD
for InputFormats based on the "new" MapReduce API (org.apache.hadoop.mapreduce
). -
JavaRDD.saveAsObjectFile
andJavaSparkContext.objectFile
support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.
PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext
's textFile
method. This method takes an URI for the file (either a local path on the machine, or a hdfs://
, s3n://
, etc URI) and reads it as a collection of lines. Here is an example invocation:
{% highlight python %}
distFile = sc.textFile("data.txt") {% endhighlight %}
Once created, distFile
can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map
and reduce
operations as follows: distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
.
Some notes on reading files with Spark:
-
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
-
All of Spark's file-based input methods, including
textFile
, support running on directories, compressed files, and wildcards as well. For example, you can usetextFile("/my/directory")
,textFile("/my/directory/*.txt")
, andtextFile("/my/directory/*.gz")
. -
The
textFile
method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
Apart from text files, Spark's Python API also supports several other data formats:
-
SparkContext.wholeTextFiles
lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast withtextFile
, which would return one record per line in each file. -
RDD.saveAsPickleFile
andSparkContext.pickleFile
support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10. -
SequenceFile and Hadoop Input/Output Formats
Note this feature is currently marked Experimental
and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.
Writable Support
PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the resulting Java objects using Pyrolite. When saving an RDD of key-value pairs to SequenceFile, PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following Writables are automatically converted:
Writable Type | Python Type |
---|---|
Text | unicode str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
Arrays are not handled out-of-the-box. Users need to specify custom ArrayWritable
subtypes when reading or writing. When writing,
users also need to specify custom converters that convert arrays to custom ArrayWritable
subtypes. When reading, the default
converter will convert custom ArrayWritable
subtypes to Java Object[]
, which then get pickled to Python tuples. To get
Python array.array
for arrays of primitive types, users need to specify custom converters.
Saving and Loading SequenceFiles
Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value classes can be specified, but for standard Writables this is not required.
{% highlight python %}
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) rdd.saveAsSequenceFile("path/to/file") sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')] {% endhighlight %}
Saving and Loading Other Hadoop Input/Output Formats
PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs. If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the Elasticsearch ESInputFormat:
{% highlight python %} $ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345}) {% endhighlight %}
Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and the key and value classes can easily be converted according to the above table, then this approach should work well for such cases.
If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to
transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler.
A Converter trait is provided
for this. Simply extend this trait and implement your transformation code in the convert
method. Remember to ensure that this class, along with any dependencies required to access your InputFormat
, are packaged into your Spark job jar and included on the PySpark
classpath.
See the Python examples and
the Converter examples
for examples of using Cassandra / HBase InputFormat
and OutputFormat
with custom converters.