From c23bf1aff4b9a1faf9d32c7b64acad2213f9515c Mon Sep 17 00:00:00 2001 From: Josh Rosen <joshrosen@eecs.berkeley.edu> Date: Sat, 20 Oct 2012 00:16:41 +0000 Subject: [PATCH] Add PySpark README and run scripts. --- core/src/main/scala/spark/SparkContext.scala | 2 +- pyspark/README | 58 ++++++++++++++++++++ pyspark/pyspark-shell | 3 + pyspark/pyspark/context.py | 5 +- pyspark/pyspark/examples/wordcount.py | 17 ++++++ pyspark/pyspark/shell.py | 21 +++++++ pyspark/run-pyspark | 23 ++++++++ 7 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 pyspark/README create mode 100755 pyspark/pyspark-shell create mode 100644 pyspark/pyspark/examples/wordcount.py create mode 100644 pyspark/pyspark/shell.py create mode 100755 pyspark/run-pyspark diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index becf737597..acb38ae33d 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -113,7 +113,7 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", - "SPARK_TESTING")) { + "SPARK_TESTING", "PYTHONPATH")) { val value = System.getenv(key) if (value != null) { executorEnvs(key) = value diff --git a/pyspark/README b/pyspark/README new file mode 100644 index 0000000000..63a1def141 --- /dev/null +++ b/pyspark/README @@ -0,0 +1,58 @@ +# PySpark + +PySpark is a Python API for Spark. + +PySpark jobs are writen in Python and executed using a standard Python +interpreter; this supports modules that use Python C extensions. The +API is based on the Spark Scala API and uses regular Python functions +and lambdas to support user-defined functions. PySpark supports +interactive use through a standard Python interpreter; it can +automatically serialize closures and ship them to worker processes. + +PySpark is built on top of the Spark Java API. Data is uniformly +represented as serialized Python objects and stored in Spark Java +processes, which communicate with PySpark worker processes over pipes. + +## Features + +PySpark supports most of the Spark API, including broadcast variables. +RDDs are dynamically typed and can hold any Python object. + +PySpark does not support: + +- Special functions on RDDs of doubles +- Accumulators + +## Examples and Documentation + +The PySpark source contains docstrings and doctests that document its +API. The public classes are in `context.py` and `rdd.py`. + +The `pyspark/pyspark/examples` directory contains a few complete +examples. + +## Installing PySpark + +PySpark requires a development version of Py4J, a Python library for +interacting with Java processes. It can be installed from +https://github.com/bartdag/py4j; make sure to install a version that +contains at least the commits through 3dbf380d3d. + +PySpark uses the `PYTHONPATH` environment variable to search for Python +classes; Py4J should be on this path, along with any libraries used by +PySpark programs. `PYTHONPATH` will be automatically shipped to worker +machines, but the files that it points to must be present on each +machine. + +PySpark requires the Spark assembly JAR, which can be created by running +`sbt/sbt assembly` in the Spark directory. + +Additionally, `SPARK_HOME` should be set to the location of the Spark +package. + +## Running PySpark + +The easiest way to run PySpark is to use the `run-pyspark` and +`pyspark-shell` scripts, which are included in the `pyspark` directory. +These scripts automatically load the `spark-conf.sh` file, set +`SPARK_HOME`, and add the `pyspark` package to the `PYTHONPATH`. diff --git a/pyspark/pyspark-shell b/pyspark/pyspark-shell new file mode 100755 index 0000000000..4ed3e6010c --- /dev/null +++ b/pyspark/pyspark-shell @@ -0,0 +1,3 @@ +#!/bin/sh +FWDIR="`dirname $0`" +exec $FWDIR/run-pyspark $FWDIR/pyspark/shell.py "$@" diff --git a/pyspark/pyspark/context.py b/pyspark/pyspark/context.py index 3f4db26644..50d57e5317 100644 --- a/pyspark/pyspark/context.py +++ b/pyspark/pyspark/context.py @@ -18,14 +18,13 @@ class SparkContext(object): asPickle = jvm.spark.api.python.PythonRDD.asPickle arrayAsPickle = jvm.spark.api.python.PythonRDD.arrayAsPickle - def __init__(self, master, name, defaultParallelism=None, - pythonExec='python'): + def __init__(self, master, name, defaultParallelism=None): self.master = master self.name = name self._jsc = self.jvm.JavaSparkContext(master, name) self.defaultParallelism = \ defaultParallelism or self._jsc.sc().defaultParallelism() - self.pythonExec = pythonExec + self.pythonExec = os.environ.get("PYSPARK_PYTHON_EXEC", 'python') # Broadcast's __reduce__ method stores Broadcast instances here. # This allows other code to determine which Broadcast instances have # been pickled, so it can determine which Java broadcast objects to diff --git a/pyspark/pyspark/examples/wordcount.py b/pyspark/pyspark/examples/wordcount.py new file mode 100644 index 0000000000..8365c070e8 --- /dev/null +++ b/pyspark/pyspark/examples/wordcount.py @@ -0,0 +1,17 @@ +import sys +from operator import add +from pyspark.context import SparkContext + +if __name__ == "__main__": + if len(sys.argv) < 3: + print >> sys.stderr, \ + "Usage: PythonWordCount <master> <file>" + exit(-1) + sc = SparkContext(sys.argv[1], "PythonWordCount") + lines = sc.textFile(sys.argv[2], 1) + counts = lines.flatMap(lambda x: x.split(' ')) \ + .map(lambda x: (x, 1)) \ + .reduceByKey(add) + output = counts.collect() + for (word, count) in output: + print "%s : %i" % (word, count) diff --git a/pyspark/pyspark/shell.py b/pyspark/pyspark/shell.py new file mode 100644 index 0000000000..7ef30894cb --- /dev/null +++ b/pyspark/pyspark/shell.py @@ -0,0 +1,21 @@ +""" +An interactive shell. +""" +import code +import sys + +from pyspark.context import SparkContext + + +def main(master='local'): + sc = SparkContext(master, 'PySparkShell') + print "Spark context available as sc." + code.interact(local={'sc': sc}) + + +if __name__ == '__main__': + if len(sys.argv) > 1: + master = sys.argv[1] + else: + master = 'local' + main(master) diff --git a/pyspark/run-pyspark b/pyspark/run-pyspark new file mode 100755 index 0000000000..9c5e027962 --- /dev/null +++ b/pyspark/run-pyspark @@ -0,0 +1,23 @@ +#!/bin/bash + +# Figure out where the Scala framework is installed +FWDIR="$(cd `dirname $0`; cd ../; pwd)" + +# Export this as SPARK_HOME +export SPARK_HOME="$FWDIR" + +# Load environment variables from conf/spark-env.sh, if it exists +if [ -e $FWDIR/conf/spark-env.sh ] ; then + . $FWDIR/conf/spark-env.sh +fi + +# Figure out which Python executable to use +if [ -z "$PYSPARK_PYTHON" ] ; then + PYSPARK_PYTHON="python" +fi +export PYSPARK_PYTHON + +# Add the PySpark classes to the Python path: +export PYTHONPATH=$SPARK_HOME/pyspark/:$PYTHONPATH + +exec "$PYSPARK_PYTHON" "$@" -- GitLab