diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 08e3f670f57f6ea63feeb59b2b2a8a397acda394..67d45723badd8b4327a558664ffa37abdd5cc061 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.api.python
 
-import java.io.{File, DataInputStream, IOException}
-import java.net.{Socket, SocketException, InetAddress}
+import java.io.{OutputStreamWriter, File, DataInputStream, IOException}
+import java.net.{ServerSocket, Socket, SocketException, InetAddress}
 
 import scala.collection.JavaConversions._
 
@@ -26,11 +26,30 @@ import org.apache.spark._
 
 private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
     extends Logging {
+
+  // Because forking processes from Java is expensive, we prefer to launch a single Python daemon
+  // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
+  // only works on UNIX-based systems now because it uses signals for child management, so we can
+  // also fall back to launching workers (pyspark/worker.py) directly.
+  val useDaemon = !System.getProperty("os.name").startsWith("Windows")
+
   var daemon: Process = null
   val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
   var daemonPort: Int = 0
 
   def create(): Socket = {
+    if (useDaemon) {
+      createThroughDaemon()
+    } else {
+      createSimpleWorker()
+    }
+  }
+
+  /**
+   * Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
+   * to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
+   */
+  private def createThroughDaemon(): Socket = {
     synchronized {
       // Start the daemon if it hasn't been started
       startDaemon()
@@ -50,6 +69,78 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
     }
   }
 
+  /**
+   * Launch a worker by executing worker.py directly and telling it to connect to us.
+   */
+  private def createSimpleWorker(): Socket = {
+    var serverSocket: ServerSocket = null
+    try {
+      serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
+
+      // Create and start the worker
+      val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
+      val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
+      val workerEnv = pb.environment()
+      workerEnv.putAll(envVars)
+      val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
+      workerEnv.put("PYTHONPATH", pythonPath)
+      val worker = pb.start()
+
+      // Redirect the worker's stderr to ours
+      new Thread("stderr reader for " + pythonExec) {
+        setDaemon(true)
+        override def run() {
+          scala.util.control.Exception.ignoring(classOf[IOException]) {
+            // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+            val in = worker.getErrorStream
+            val buf = new Array[Byte](1024)
+            var len = in.read(buf)
+            while (len != -1) {
+              System.err.write(buf, 0, len)
+              len = in.read(buf)
+            }
+          }
+        }
+      }.start()
+
+      // Redirect worker's stdout to our stderr
+      new Thread("stdout reader for " + pythonExec) {
+        setDaemon(true)
+        override def run() {
+          scala.util.control.Exception.ignoring(classOf[IOException]) {
+            // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+            val in = worker.getInputStream
+            val buf = new Array[Byte](1024)
+            var len = in.read(buf)
+            while (len != -1) {
+              System.err.write(buf, 0, len)
+              len = in.read(buf)
+            }
+          }
+        }
+      }.start()
+
+      // Tell the worker our port
+      val out = new OutputStreamWriter(worker.getOutputStream)
+      out.write(serverSocket.getLocalPort + "\n")
+      out.flush()
+
+      // Wait for it to connect to our socket
+      serverSocket.setSoTimeout(10000)
+      try {
+        return serverSocket.accept()
+      } catch {
+        case e: Exception =>
+          throw new SparkException("Python worker did not connect back in time", e)
+      }
+    } finally {
+      if (serverSocket != null) {
+        serverSocket.close()
+      }
+    }
+    null
+  }
+
   def stop() {
     stopDaemon()
   }
@@ -73,12 +164,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
         // Redirect the stderr to ours
         new Thread("stderr reader for " + pythonExec) {
+          setDaemon(true)
           override def run() {
             scala.util.control.Exception.ignoring(classOf[IOException]) {
-              // FIXME HACK: We copy the stream on the level of bytes to
-              // attempt to dodge encoding problems.
+              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
               val in = daemon.getErrorStream
-              var buf = new Array[Byte](1024)
+              val buf = new Array[Byte](1024)
               var len = in.read(buf)
               while (len != -1) {
                 System.err.write(buf, 0, len)
@@ -93,11 +184,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
         // Redirect further stdout output to our stderr
         new Thread("stdout reader for " + pythonExec) {
+          setDaemon(true)
           override def run() {
             scala.util.control.Exception.ignoring(classOf[IOException]) {
-              // FIXME HACK: We copy the stream on the level of bytes to
-              // attempt to dodge encoding problems.
-              var buf = new Array[Byte](1024)
+              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+              val buf = new Array[Byte](1024)
               var len = in.read(buf)
               while (len != -1) {
                 System.err.write(buf, 0, len)
diff --git a/docs/configuration.md b/docs/configuration.md
index 58e9434bdc086c783ef550d2959bec90eed51016..aaf85ed4f4c155d855e4519010b9b99757aec2a4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -315,7 +315,7 @@ Apart from these, the following properties are also available, and may be useful
 # Environment Variables
 
 Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh`
-script in the directory where Spark is installed. These variables are meant to be for machine-specific settings, such
+script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
 as library search paths. While Java system properties can also be set here, for application settings, we recommend setting
 these properties within the application instead of in `spark-env.sh` so that different applications can use different
 settings.
@@ -325,6 +325,8 @@ Note that `conf/spark-env.sh` does not exist by default when Spark is installed.
 
 The following variables can be set in `spark-env.sh`:
 
+* `JAVA_HOME`, the location where Java is installed (if it's not on your default `PATH`)
+* `PYSPARK_PYTHON`, the Python binary to use for PySpark
 * `SPARK_LOCAL_IP`, to configure which IP address of the machine to bind to.
 * `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
 * `SPARK_CLASSPATH`, to add elements to Spark's classpath that you want to be present for _all_ applications.
diff --git a/docs/index.md b/docs/index.md
index 3cf9cc1c6412f662f281ad3c47ba82756702a25e..c7018d884683d8cd37f57042fc4dc9a4b3c179ee 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -11,6 +11,8 @@ Spark can run on the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or w
 
 Get Spark by visiting the [downloads page](http://spark.incubator.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}.
 
+Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation.
+
 # Building
 
 Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
@@ -50,6 +52,8 @@ In addition, if you wish to run Spark on [YARN](running-on-yarn.md), set
 
     SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
 
+(Note that on Windows, you need to set the environment variables on separate lines, e.g., `set SPARK_HADOOP_VERSION=1.2.1`.)
+
 # Where to Go from Here
 
 **Programming guides:**
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 8a539fe774f56bf3ab381d4ed71b6a9faa2c42cd..8c33a953a44baa93af4cd672b4eb2ac82dc28bde 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -4,7 +4,7 @@ title: Python Programming Guide
 ---
 
 
-The Spark Python API (PySpark) exposes most of the Spark features available in the Scala version to Python.
+The Spark Python API (PySpark) exposes the Spark programming model to Python.
 To learn the basics of Spark, we recommend reading through the
 [Scala programming guide](scala-programming-guide.html) first; it should be
 easy to follow even if you don't know Scala.
@@ -15,12 +15,8 @@ This guide will show how to use the Spark features described there in Python.
 
 There are a few key differences between the Python and Scala APIs:
 
-* Python is dynamically typed, so RDDs can hold objects of different types.
-* PySpark does not currently support the following Spark features:
-    - `lookup`
-    - `sort`
-    - `persist` at storage levels other than `MEMORY_ONLY`
-    - Execution on Windows -- this is slated for a future release
+* Python is dynamically typed, so RDDs can hold objects of multiple types.
+* PySpark does not yet support a few API calls, such as `lookup`, `sort`, and `persist` at custom storage levels. See the [API docs](api/pyspark/index.html) for details.
 
 In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types.
 Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax:
@@ -30,7 +26,7 @@ logData = sc.textFile(logFile).cache()
 errors = logData.filter(lambda line: "ERROR" in line)
 {% endhighlight %}
 
-You can also pass functions that are defined using the `def` keyword; this is useful for more complicated functions that cannot be expressed using `lambda`:
+You can also pass functions that are defined with the `def` keyword; this is useful for longer functions that can't be expressed using `lambda`:
 
 {% highlight python %}
 def is_error(line):
@@ -38,7 +34,7 @@ def is_error(line):
 errors = logData.filter(is_error)
 {% endhighlight %}
 
-Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated to other tasks:
+Functions can access objects in enclosing scopes, although modifications to those objects within RDD methods will not be propagated back:
 
 {% highlight python %}
 error_keywords = ["Exception", "Error"]
@@ -51,17 +47,20 @@ PySpark will automatically ship these functions to workers, along with any objec
 Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
 The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
 
+In addition, PySpark fully supports interactive use---simply run `./pyspark` to launch an interactive shell.
+
 
 # Installing and Configuring PySpark
 
 PySpark requires Python 2.6 or higher.
-PySpark jobs are executed using a standard cPython interpreter in order to support Python modules that use C extensions.
+PySpark jobs are executed using a standard CPython interpreter in order to support Python modules that use C extensions.
 We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/).
-By default, PySpark's scripts will run programs using `python`; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh`.
+
+By default, PySpark requires `python` to be available on the system `PATH` and use it to run programs; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh` (or `.cmd` on Windows).
 
 All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
 
-Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh`.
+Standalone PySpark jobs should be run using the `pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
 The script automatically adds the `pyspark` package to the `PYTHONPATH`.
 
 
@@ -101,7 +100,7 @@ $ MASTER=local[4] ./pyspark
 ## IPython
 
 It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter.
-To do this, simply set the `IPYTHON` variable to `1` when running `pyspark`:
+To do this, set the `IPYTHON` variable to `1` when running `pyspark`:
 
 {% highlight bash %}
 $ IPYTHON=1 ./pyspark
@@ -132,15 +131,16 @@ sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg
 Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
 Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
 
+# API Docs
+
+[API documentation](api/pyspark/index.html) for PySpark is available as Epydoc.
+Many of the methods also contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
 
 # Where to Go from Here
 
-PySpark includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
-You can run them by passing the files to the `pyspark` script; e.g.:
+PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
+You can run them by passing the files to `pyspark`; e.g.:
 
     ./pyspark python/examples/wordcount.py
 
 Each program prints usage help when run without arguments.
-
-We currently provide [API documentation](api/pyspark/index.html) for the Python API as Epydoc.
-Many of the RDD method descriptions contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples.
diff --git a/pyspark.cmd b/pyspark.cmd
new file mode 100644
index 0000000000000000000000000000000000000000..7c26fbbac28b8f0a17815a5ef1e3f60e8218d9bf
--- /dev/null
+++ b/pyspark.cmd
@@ -0,0 +1,23 @@
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem    http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem This is the entry point for running PySpark. To avoid polluting the
+rem environment, it just launches a new cmd to do the real work.
+
+cmd /V /E /C %~dp0pyspark2.cmd %*
diff --git a/pyspark2.cmd b/pyspark2.cmd
new file mode 100644
index 0000000000000000000000000000000000000000..f58e34964393b359614cdf71e45665a8ed36ba42
--- /dev/null
+++ b/pyspark2.cmd
@@ -0,0 +1,55 @@
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem    http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+set SCALA_VERSION=2.9.3
+
+rem Figure out where the Spark framework is installed
+set FWDIR=%~dp0
+
+rem Export this as SPARK_HOME
+set SPARK_HOME=%FWDIR%
+
+rem Test whether the user has built Spark
+if exist "%FWDIR%RELEASE" goto skip_build_test
+set FOUND_JAR=0
+for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
+  set FOUND_JAR=1
+)
+if "%FOUND_JAR%"=="0" (
+  echo Failed to find Spark assembly JAR.
+  echo You need to build Spark with sbt\sbt assembly before running this program.
+  goto exit
+)
+:skip_build_test
+
+rem Load environment variables from conf\spark-env.cmd, if it exists
+if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+
+rem Figure out which Python to use.
+if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
+
+set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
+
+set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
+set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
+
+echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
+
+"%PYSPARK_PYTHON%" %*
+:exit
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 26fbe0f08045d30e0f07028949f3a783e7551035..e615c1e9b63a46a857aa575561522410c2a6ce7e 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -18,6 +18,7 @@
 import os
 import sys
 import signal
+import platform
 from subprocess import Popen, PIPE
 from threading import Thread
 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
@@ -29,12 +30,18 @@ SPARK_HOME = os.environ["SPARK_HOME"]
 def launch_gateway():
     # Launch the Py4j gateway using Spark's run command so that we pick up the
     # proper classpath and SPARK_MEM settings from spark-env.sh
-    command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer",
+    on_windows = platform.system() == "Windows"
+    script = "spark-class.cmd" if on_windows else "spark-class"
+    command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
                "--die-on-broken-pipe", "0"]
-    # Don't send ctrl-c / SIGINT to the Java gateway:
-    def preexec_function():
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
-    proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_function)
+    if not on_windows:
+        # Don't send ctrl-c / SIGINT to the Java gateway:
+        def preexec_func():
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
+        proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+    else:
+        # preexec_fn not supported on Windows
+        proc = Popen(command, stdout=PIPE, stdin=PIPE)
     # Determine which ephemeral port the server started on:
     port = int(proc.stdout.readline())
     # Create a thread to echo output from the GatewayServer, which is required
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 695f6dfb8444c1ef72f1649438854d402c5e0011..d63c2aaef772de62eef3bf913ad4a4859cf30512 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -21,6 +21,7 @@ Worker that receives input from Piped RDD.
 import os
 import sys
 import time
+import socket
 import traceback
 from base64 import standard_b64decode
 # CloudPickler needs to be imported so that depicklers are registered using the
@@ -94,7 +95,9 @@ def main(infile, outfile):
 
 
 if __name__ == '__main__':
-    # Redirect stdout to stderr so that users must return values from functions.
-    old_stdout = os.fdopen(os.dup(1), 'w')
-    os.dup2(2, 1)
-    main(sys.stdin, old_stdout)
+    # Read a local port to connect to from stdin
+    java_port = int(sys.stdin.readline())
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.connect(("127.0.0.1", java_port))
+    sock_file = sock.makefile("a+", 65536)
+    main(sock_file, sock_file)