diff --git a/assembly/pom.xml b/assembly/pom.xml
index bdb38806492a6dc47b790b2837a2ab6af4c81964..7d123fb1d7f02557f99a511e1101996cdc6a04d9 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -40,14 +40,6 @@
     <deb.user>root</deb.user>
   </properties>
 
-  <repositories>
-    <!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
-    <repository>
-      <id>lib</id>
-      <url>file://${project.basedir}/lib</url>
-    </repository>
-  </repositories>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -84,11 +76,6 @@
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>net.sf.py4j</groupId>
-      <artifactId>py4j</artifactId>
-      <version>0.8.1</version>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/core/pom.xml b/core/pom.xml
index c24c7be204087d1daa4eb28a153e7a8918d8a528..8fe215ab242896c0c2e88816e90738fb8e9a0129 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -247,6 +247,11 @@
       <artifactId>pyrolite</artifactId>
       <version>2.0.1</version>
     </dependency>
+    <dependency>
+      <groupId>net.sf.py4j</groupId>
+      <artifactId>py4j</artifactId>
+      <version>0.8.1</version>
+    </dependency>
   </dependencies>
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d40ed27da5392f33c73f5d5a9988a2f943e012b1..806e77d98fc5f4033cfe1dfbe784b2ab6819a0a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import java.io.File
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.concurrent.Await
@@ -304,7 +306,7 @@ object SparkEnv extends Logging {
       k == "java.class.path"
     }.getOrElse(("", ""))
     val classPathEntries = classPathProperty._2
-      .split(conf.get("path.separator", ":"))
+      .split(File.pathSeparator)
       .filterNot(e => e.isEmpty)
       .map(e => (e, "System Classpath"))
     val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cf69fa1d53fde436c5f309dc01469201f58d844a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkContext
+
+private[spark] object PythonUtils {
+  /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
+  def sparkPythonPath: String = {
+    val pythonPath = new ArrayBuffer[String]
+    for (sparkHome <- sys.env.get("SPARK_HOME")) {
+      pythonPath += Seq(sparkHome, "python").mkString(File.separator)
+      pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator)
+    }
+    pythonPath ++= SparkContext.jarOfObject(this)
+    pythonPath.mkString(File.pathSeparator)
+  }
+
+  /** Merge PYTHONPATHS with the appropriate separator. Ignores blank strings. */
+  def mergePythonPaths(paths: String*): String = {
+    paths.filter(_ != "").mkString(File.pathSeparator)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 02799ce0091b0876a006d5542ad8d9b2ba84727f..b0bf4e052b3e901337aaebeee87869d222547e44 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
   val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
   var daemonPort: Int = 0
 
+  val pythonPath = PythonUtils.mergePythonPaths(
+    PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
+
   def create(): Socket = {
     if (useDaemon) {
       createThroughDaemon()
@@ -78,9 +81,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
 
       // Create and start the worker
-      val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
+      val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
       val workerEnv = pb.environment()
       workerEnv.putAll(envVars)
+      workerEnv.put("PYTHONPATH", pythonPath)
       val worker = pb.start()
 
       // Redirect the worker's stderr to ours
@@ -151,9 +155,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
       try {
         // Create and start the daemon
-        val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
+        val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
         val workerEnv = pb.environment()
         workerEnv.putAll(envVars)
+        workerEnv.put("PYTHONPATH", pythonPath)
         daemon = pb.start()
 
         // Redirect the stderr to ours
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f2e7c7a508b3f9363e4763462860732cd406f5d2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{IOException, File, InputStream, OutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.python.PythonUtils
+
+/**
+ * A main class used by spark-submit to launch Python applications. It executes python as a
+ * subprocess and then has it connect back to the JVM to access system properties, etc.
+ */
+object PythonRunner {
+  def main(args: Array[String]) {
+    val primaryResource = args(0)
+    val pyFiles = args(1)
+    val otherArgs = args.slice(2, args.length)
+
+    val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
+
+    // Launch a Py4J gateway server for the process to connect to; this will let it see our
+    // Java system properties and such
+    val gatewayServer = new py4j.GatewayServer(null, 0)
+    gatewayServer.start()
+
+    // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
+    // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
+    val pathElements = new ArrayBuffer[String]
+    pathElements ++= pyFiles.split(",")
+    pathElements += PythonUtils.sparkPythonPath
+    pathElements += sys.env.getOrElse("PYTHONPATH", "")
+    val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
+
+    // Launch Python process
+    val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
+    val env = builder.environment()
+    env.put("PYTHONPATH", pythonPath)
+    env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+    builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+    val process = builder.start()
+
+    new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+
+    System.exit(process.waitFor())
+  }
+
+  /**
+   * A utility class to redirect the child process's stdout or stderr
+   */
+  class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) {
+    setDaemon(true)
+    override def run() {
+      scala.util.control.Exception.ignoring(classOf[IOException]) {
+        // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+        val buf = new Array[Byte](1024)
+        var len = in.read(buf)
+        while (len != -1) {
+          out.write(buf, 0, len)
+          out.flush()
+          len = in.read(buf)
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index fb30e8a70f682edad0839a028cfee6a154adf758..e39723f38347cced75ff466ca4781dc5b3ab8333 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -60,11 +60,11 @@ object SparkSubmit {
   private[spark] var exitFn: () => Unit = () => System.exit(-1)
 
   private[spark] def printErrorAndExit(str: String) = {
-    printStream.println("error: " + str)
-    printStream.println("run with --help for more information or --verbose for debugging output")
+    printStream.println("Error: " + str)
+    printStream.println("Run with --help for usage help or --verbose for debug output")
     exitFn()
   }
-  private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
+  private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
 
   /**
    * @return
@@ -72,43 +72,43 @@ object SparkSubmit {
    *         entries for the child, a list of system propertes, a list of env vars
    *         and the main class for the child
    */
-  private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
+  private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
       ArrayBuffer[String], Map[String, String], String) = {
-    if (appArgs.master.startsWith("local")) {
+    if (args.master.startsWith("local")) {
       clusterManager = LOCAL
-    } else if (appArgs.master.startsWith("yarn")) {
+    } else if (args.master.startsWith("yarn")) {
       clusterManager = YARN
-    } else if (appArgs.master.startsWith("spark")) {
+    } else if (args.master.startsWith("spark")) {
       clusterManager = STANDALONE
-    } else if (appArgs.master.startsWith("mesos")) {
+    } else if (args.master.startsWith("mesos")) {
       clusterManager = MESOS
     } else {
-      printErrorAndExit("master must start with yarn, mesos, spark, or local")
+      printErrorAndExit("Master must start with yarn, mesos, spark, or local")
     }
 
     // Because "yarn-cluster" and "yarn-client" encapsulate both the master
     // and deploy mode, we have some logic to infer the master and deploy mode
     // from each other if only one is specified, or exit early if they are at odds.
-    if (appArgs.deployMode == null &&
-        (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
-      appArgs.deployMode = "cluster"
+    if (args.deployMode == null &&
+        (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
+      args.deployMode = "cluster"
     }
-    if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
+    if (args.deployMode == "cluster" && args.master == "yarn-client") {
       printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
     }
-    if (appArgs.deployMode == "client" &&
-        (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
-      printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master
+    if (args.deployMode == "client" &&
+        (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
+      printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
         + "\" are not compatible")
     }
-    if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
-      appArgs.master = "yarn-cluster"
+    if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
+      args.master = "yarn-cluster"
     }
-    if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
-      appArgs.master = "yarn-client"
+    if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
+      args.master = "yarn-client"
     }
 
-    val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster"
+    val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"
 
     val childClasspath = new ArrayBuffer[String]()
     val childArgs = new ArrayBuffer[String]()
@@ -116,76 +116,93 @@ object SparkSubmit {
     var childMainClass = ""
 
     if (clusterManager == MESOS && deployOnCluster) {
-      printErrorAndExit("Mesos does not support running the driver on the cluster")
+      printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
     }
 
+    // If we're running a Python app, set the Java class to run to be our PythonRunner, add
+    // Python files to deployment list, and pass the main file and Python path to PythonRunner
+    if (args.isPython) {
+      if (deployOnCluster) {
+        printErrorAndExit("Cannot currently run Python driver programs on cluster")
+      }
+      args.mainClass = "org.apache.spark.deploy.PythonRunner"
+      args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
+      val pyFiles = Option(args.pyFiles).getOrElse("")
+      args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
+      args.primaryResource = RESERVED_JAR_NAME
+      sysProps("spark.submit.pyFiles") = pyFiles
+    }
+
+    // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
     if (!deployOnCluster) {
-      childMainClass = appArgs.mainClass
-      if (appArgs.primaryResource != RESERVED_JAR_NAME) {
-        childClasspath += appArgs.primaryResource
+      childMainClass = args.mainClass
+      if (args.primaryResource != RESERVED_JAR_NAME) {
+        childClasspath += args.primaryResource
       }
     } else if (clusterManager == YARN) {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
-      childArgs += ("--jar", appArgs.primaryResource)
-      childArgs += ("--class", appArgs.mainClass)
+      childArgs += ("--jar", args.primaryResource)
+      childArgs += ("--class", args.mainClass)
     }
 
+    // Make sure YARN is included in our build if we're trying to use it
     if (clusterManager == YARN) {
-      // The choice of class is arbitrary, could use any spark-yarn class
       if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
-        val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
-          "with YARN support."
-        throw new Exception(msg)
+        printErrorAndExit("Could not load YARN classes. " +
+          "This copy of Spark may not have been compiled with YARN support.")
       }
     }
 
     // Special flag to avoid deprecation warnings at the client
     sysProps("SPARK_SUBMIT") = "true"
 
+    // A list of rules to map each argument to system properties or command-line options in
+    // each deploy mode; we iterate through these below
     val options = List[OptionAssigner](
-      new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
-      new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
+      OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
         sysProp = "spark.driver.extraClassPath"),
-      new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
+      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
         sysProp = "spark.driver.extraJavaOptions"),
-      new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
+      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
         sysProp = "spark.driver.extraLibraryPath"),
-      new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
-      new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
-      new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
-      new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
-      new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
-      new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
-      new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
-      new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
+      OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
+      OptionAssigner(args.name, YARN, true, clOption = "--name"),
+      OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
+      OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
+      OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
+      OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
+      OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
+      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
         sysProp = "spark.executor.memory"),
-      new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
-      new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
-      new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
-      new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
-      new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
+      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
+      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
+      OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
+      OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
+      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
         sysProp = "spark.cores.max"),
-      new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
-      new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
-      new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
-      new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
-      new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
-      new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
-      new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
-      new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
-        sysProp = "spark.app.name")
+      OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
+      OptionAssigner(args.files, YARN, true, clOption = "--files"),
+      OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
+      OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
+      OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
+      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
+      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
+      OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
+      OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
     )
 
     // For client mode make any added jars immediately visible on the classpath
-    if (appArgs.jars != null && !deployOnCluster) {
-      for (jar <- appArgs.jars.split(",")) {
+    if (args.jars != null && !deployOnCluster) {
+      for (jar <- args.jars.split(",")) {
         childClasspath += jar
       }
     }
 
+    // Map all arguments to command-line options or system properties for our chosen mode
     for (opt <- options) {
       if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
-        (clusterManager & opt.clusterManager) != 0) {
+          (clusterManager & opt.clusterManager) != 0) {
         if (opt.clOption != null) {
           childArgs += (opt.clOption, opt.value)
         } else if (opt.sysProp != null) {
@@ -197,32 +214,35 @@ object SparkSubmit {
     // For standalone mode, add the application jar automatically so the user doesn't have to
     // call sc.addJar. TODO: Standalone mode in the cluster
     if (clusterManager == STANDALONE) {
-      val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
-      sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
+      var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+      if (args.primaryResource != RESERVED_JAR_NAME) {
+        jars = jars ++ Seq(args.primaryResource)
+      }
+      sysProps.put("spark.jars", jars.mkString(","))
     }
 
     if (deployOnCluster && clusterManager == STANDALONE) {
-      if (appArgs.supervise) {
+      if (args.supervise) {
         childArgs += "--supervise"
       }
 
       childMainClass = "org.apache.spark.deploy.Client"
       childArgs += "launch"
-      childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
+      childArgs += (args.master, args.primaryResource, args.mainClass)
     }
 
     // Arguments to be passed to user program
-    if (appArgs.childArgs != null) {
+    if (args.childArgs != null) {
       if (!deployOnCluster || clusterManager == STANDALONE) {
-        childArgs ++= appArgs.childArgs
+        childArgs ++= args.childArgs
       } else if (clusterManager == YARN) {
-        for (arg <- appArgs.childArgs) {
+        for (arg <- args.childArgs) {
           childArgs += ("--arg", arg)
         }
       }
     }
 
-    for ((k, v) <- appArgs.getDefaultSparkProperties) {
+    for ((k, v) <- args.getDefaultSparkProperties) {
       if (!sysProps.contains(k)) sysProps(k) = v
     }
 
@@ -230,8 +250,8 @@ object SparkSubmit {
   }
 
   private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
-      sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
-
+      sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
+  {
     if (verbose) {
       printStream.println(s"Main class:\n$childMainClass")
       printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
@@ -273,15 +293,26 @@ object SparkSubmit {
     val url = localJarFile.getAbsoluteFile.toURI.toURL
     loader.addURL(url)
   }
+
+  /**
+   * Merge a sequence of comma-separated file lists, some of which may be null to indicate
+   * no files, into a single comma-separated string.
+   */
+  private[spark] def mergeFileLists(lists: String*): String = {
+    val merged = lists.filter(_ != null)
+                      .flatMap(_.split(","))
+                      .mkString(",")
+    if (merged == "") null else merged
+  }
 }
 
 /**
  * Provides an indirection layer for passing arguments as system properties or flags to
  * the user's driver program or to downstream launcher tools.
  */
-private[spark] class OptionAssigner(val value: String,
-  val clusterManager: Int,
-  val deployOnCluster: Boolean,
-  val clOption: String = null,
-  val sysProp: String = null
-) { }
+private[spark] case class OptionAssigner(
+    value: String,
+    clusterManager: Int,
+    deployOnCluster: Boolean,
+    clOption: String = null,
+    sysProp: String = null)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 7031cdd9b4ae0c4cc02246baeaab372345aff7e4..2d327aa3fb27fe3774c3c59254394f4008f7b931 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
 
 import java.io.{File, FileInputStream, IOException}
 import java.util.Properties
+import java.util.jar.JarFile
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -52,6 +53,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
   var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
   var jars: String = null
   var verbose: Boolean = false
+  var isPython: Boolean = false
+  var pyFiles: String = null
 
   parseOpts(args.toList)
   loadDefaults()
@@ -76,7 +79,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
   }
 
   /** Fill in any undefined values based on the current properties file or built-in defaults. */
-  private def loadDefaults() = {
+  private def loadDefaults(): Unit = {
 
     // Use common defaults file, if not specified by user
     if (propertiesFile == null) {
@@ -107,15 +110,43 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
     master = Option(master).getOrElse(System.getenv("MASTER"))
     deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
 
+    // Try to set main class from JAR if no --class argument is given
+    if (mainClass == null && !isPython && primaryResource != null) {
+      try {
+        val jar = new JarFile(primaryResource)
+        // Note that this might still return null if no main-class is set; we catch that later
+        mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
+      } catch {
+        case e: Exception =>
+          SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
+          return
+      }
+    }
+
     // Global defaults. These should be keep to minimum to avoid confusing behavior.
     master = Option(master).getOrElse("local[*]")
+
+    // Set name from main class if not given
+    name = Option(name).orElse(Option(mainClass)).orNull
+    if (name == null && primaryResource != null) {
+      name = Utils.stripDirectory(primaryResource)
+    }
   }
 
   /** Ensure that required fields exists. Call this only once all defaults are loaded. */
   private def checkRequiredArguments() = {
-    if (args.length == 0) printUsageAndExit(-1)
-    if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
-    if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+    if (args.length == 0) {
+      printUsageAndExit(-1)
+    }
+    if (primaryResource == null) {
+      SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)")
+    }
+    if (mainClass == null && !isPython) {
+      SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
+    }
+    if (pyFiles != null && !isPython) {
+      SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
+    }
 
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
@@ -143,6 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
     |  queue                   $queue
     |  numExecutors            $numExecutors
     |  files                   $files
+    |  pyFiles                 $pyFiles
     |  archives                $archives
     |  mainClass               $mainClass
     |  primaryResource         $primaryResource
@@ -234,6 +266,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         files = value
         parse(tail)
 
+      case ("--py-files") :: value :: tail =>
+        pyFiles = value
+        parse(tail)
+
       case ("--archives") :: value :: tail =>
         archives = value
         parse(tail)
@@ -260,9 +296,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
               val errMessage = s"Unrecognized option '$value'."
               SparkSubmit.printErrorAndExit(errMessage)
             case v =>
-             primaryResource = v
-             inSparkOpts = false
-             parse(tail)
+              primaryResource = v
+              inSparkOpts = false
+              isPython = v.endsWith(".py")
+              parse(tail)
           }
         } else {
           childArgs += value
@@ -270,7 +307,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         }
 
       case Nil =>
-      }
+    }
   }
 
   private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
@@ -279,23 +316,26 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
       outStream.println("Unknown/unsupported param " + unknownParam)
     }
     outStream.println(
-      """Usage: spark-submit [options] <app jar> [app options]
+      """Usage: spark-submit [options] <app jar | python file> [app options]
         |Options:
         |  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
-        |  --deploy-mode DEPLOY_MODE   Mode to deploy the app in, either 'client' or 'cluster'.
-        |  --class CLASS_NAME          Name of your app's main class (required for Java apps).
-        |  --name NAME                 The name of your application (Default: 'Spark').
-        |  --jars JARS                 A comma-separated list of local jars to include on the
-        |                              driver classpath and that SparkContext.addJar will work
-        |                              with. Doesn't work on standalone with 'cluster' deploy mode.
-        |  --files FILES               Comma separated list of files to be placed in the working dir
-        |                              of each executor.
+        |  --deploy-mode DEPLOY_MODE   Where to run the driver program: either "client" to run
+        |                              on the local machine, or "cluster" to run inside cluster.
+        |  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
+        |  --name NAME                 A name of your application.
+        |  --jars JARS                 Comma-separated list of local jars to include on the driver
+        |                              and executor classpaths. Doesn't work for drivers in
+        |                              standalone mode with "cluster" deploy mode.
+        |  --py-files PY_FILES         Comma-separated list of .zip or .egg files to place on the
+        |                              PYTHONPATH for Python apps.
+        |  --files FILES               Comma-separated list of files to be placed in the working
+        |                              directory of each executor.
         |  --properties-file FILE      Path to a file from which to load extra properties. If not
         |                              specified, this will look for conf/spark-defaults.conf.
         |
         |  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 512M).
-        |  --driver-java-options       Extra Java options to pass to the driver
-        |  --driver-library-path       Extra library path entries to pass to the driver
+        |  --driver-java-options       Extra Java options to pass to the driver.
+        |  --driver-library-path       Extra library path entries to pass to the driver.
         |  --driver-class-path         Extra class path entries to pass to the driver. Note that
         |                              jars added with --jars are automatically included in the
         |                              classpath.
@@ -311,10 +351,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         |
         | YARN-only:
         |  --executor-cores NUM        Number of cores per executor (Default: 1).
-        |  --queue QUEUE_NAME          The YARN queue to submit to (Default: 'default').
-        |  --num-executors NUM         Number of executors to (Default: 2).
+        |  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
+        |  --num-executors NUM         Number of executors to launch (Default: 2).
         |  --archives ARCHIVES         Comma separated list of archives to be extracted into the
-        |                              working dir of each executor.""".stripMargin
+        |                              working directory of each executor.""".stripMargin
     )
     SparkSubmit.exitFn()
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bef4dab3d7cc1f1fd3fe30e5ea87b72a4f426a5d..202bd46956f87a82aebde084684eecdcfeac6a57 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -46,7 +46,6 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
  * Various utility methods used by Spark.
  */
 private[spark] object Utils extends Logging {
-
   val random = new Random()
 
   def sparkBin(sparkHome: String, which: String): File = {
@@ -1082,4 +1081,11 @@ private[spark] object Utils extends Logging {
   def isTesting = {
     sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
   }
+
+  /**
+   * Strip the directory from a path name
+   */
+  def stripDirectory(path: String): String = {
+    path.split(File.separator).last
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b3541b4a40b7933d7a55d6c0399aa6ce81807686..d7e3b22ed476ed30eb9fb570ce51283f267ddbbb 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -83,7 +83,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handle binary specified but not class") {
-    testPrematureExit(Array("foo.jar"), "Must specify a main class")
+    testPrematureExit(Array("foo.jar"), "No main class")
   }
 
   test("handles arguments with --key=val") {
@@ -94,9 +94,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles arguments to user program") {
-    val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here")
+    val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
     val appArgs = new SparkSubmitArguments(clArgs)
-    appArgs.childArgs should be (Seq("some", "--random", "args", "here"))
+    appArgs.childArgs should be (Seq("some", "--weird", "args"))
   }
 
   test("handles YARN cluster mode") {
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 98c456228af9f8252bda7fdde31c803524287e66..8ea22e15a4b69cd5b7249a6bfd028fcfaaaccd37 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -60,12 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u
 
 All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported.
 
-Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
-The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
+Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically
+configures the Java and Python environment for running Spark.
 
-# Running PySpark on YARN
-
-To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
 
 # Interactive Use
 
@@ -103,7 +100,7 @@ $ MASTER=local[4] ./bin/pyspark
 
 ## IPython
 
-It is also possible to launch PySpark in [IPython](http://ipython.org), the
+It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the
 enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
 use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`:
 
@@ -123,18 +120,17 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env
 
 # Standalone Programs
 
-PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`.
+PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`.
 The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application.
 
-Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor:
+Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`:
 
-{% highlight python %}
-from pyspark import SparkContext
-sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
+{% highlight bash %}
+./bin/spark-submit --py-files lib1.zip,lib2.zip my_script.py
 {% endhighlight %}
 
 Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
-Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+Code dependencies can also be added to an existing SparkContext at runtime using its `addPyFile()` method.
 
 You can set [configuration properties](configuration.html#spark-properties) by passing a
 [SparkConf](api/python/pyspark.conf.SparkConf-class.html) object to SparkContext:
@@ -142,12 +138,16 @@ You can set [configuration properties](configuration.html#spark-properties) by p
 {% highlight python %}
 from pyspark import SparkConf, SparkContext
 conf = (SparkConf()
-         .setMaster("local")
          .setAppName("My app")
          .set("spark.executor.memory", "1g"))
 sc = SparkContext(conf = conf)
 {% endhighlight %}
 
+`spark-submit` supports launching Python applications on standalone, Mesos or YARN clusters, through
+its `--master` argument. However, it currently requires the Python driver program to run on the local
+machine, not the cluster (i.e. the `--deploy-mode` parameter cannot be `cluster`).
+
+
 # API Docs
 
 [API documentation](api/python/index.html) for PySpark is available as Epydoc.
@@ -164,6 +164,6 @@ some example applications.
 PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
 You can run them by passing the files to `pyspark`; e.g.:
 
-    ./bin/pyspark python/examples/wordcount.py
+    ./bin/spark-submit python/examples/wordcount.py
 
 Each program prints usage help when run without arguments.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1ad05d9e46dd601ab216ddd5a9cec523052476fb..7f9746ec4acc08c7c2084c4d4b6c8d5bd201c745 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -356,7 +356,8 @@ object SparkBuild extends Build {
         "com.twitter"                % "chill-java"       % chillVersion excludeAll(excludeAsm),
         "org.tachyonproject"         % "tachyon"          % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
         "com.clearspring.analytics"  % "stream"           % "2.5.1" excludeAll(excludeFastutil),
-        "org.spark-project"          % "pyrolite"         % "2.0.1"
+        "org.spark-project"          % "pyrolite"         % "2.0.1",
+        "net.sf.py4j"                % "py4j"             % "0.8.1"
       ),
     libraryDependencies ++= maybeAvro
   )
@@ -569,7 +570,6 @@ object SparkBuild extends Build {
   )
 
   def assemblyProjSettings = sharedSettings ++ Seq(
-    libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1",
     name := "spark-assembly",
     assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn,
     jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" },
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c74dc5fd4f8547fcdf6eb5ba30c45c3c795396dc..c7dc85ea03544c696ddfb620800d281d3b4bbf28 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -158,6 +158,12 @@ class SparkContext(object):
         for path in (pyFiles or []):
             self.addPyFile(path)
 
+        # Deploy code dependencies set by spark-submit; these will already have been added
+        # with SparkContext.addFile, so we just need to add them
+        for path in self._conf.get("spark.submit.pyFiles", "").split(","):
+            if path != "":
+                self._python_includes.append(os.path.basename(path))
+
         # Create a temporary directory inside spark.local.dir:
         local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
         self._temp_dir = \
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 032d960e409987caabe92c6dd43993e18b1d8979..3d0936fdca911f0ba3903ccde8b810272c03c3cf 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -27,39 +27,43 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient
 def launch_gateway():
     SPARK_HOME = os.environ["SPARK_HOME"]
 
-    set_env_vars_for_yarn()
-
-    # Launch the Py4j gateway using Spark's run command so that we pick up the
-    # proper classpath and settings from spark-env.sh
-    on_windows = platform.system() == "Windows"
-    script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
-    command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
-               "--die-on-broken-pipe", "0"]
-    if not on_windows:
-        # Don't send ctrl-c / SIGINT to the Java gateway:
-        def preexec_func():
-            signal.signal(signal.SIGINT, signal.SIG_IGN)
-        proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+    gateway_port = -1
+    if "PYSPARK_GATEWAY_PORT" in os.environ:
+        gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
     else:
-        # preexec_fn not supported on Windows
-        proc = Popen(command, stdout=PIPE, stdin=PIPE)
-    # Determine which ephemeral port the server started on:
-    port = int(proc.stdout.readline())
-    # Create a thread to echo output from the GatewayServer, which is required
-    # for Java log output to show up:
-    class EchoOutputThread(Thread):
-        def __init__(self, stream):
-            Thread.__init__(self)
-            self.daemon = True
-            self.stream = stream
+        # Launch the Py4j gateway using Spark's run command so that we pick up the
+        # proper classpath and settings from spark-env.sh
+        on_windows = platform.system() == "Windows"
+        script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
+        command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
+                   "--die-on-broken-pipe", "0"]
+        if not on_windows:
+            # Don't send ctrl-c / SIGINT to the Java gateway:
+            def preexec_func():
+                signal.signal(signal.SIGINT, signal.SIG_IGN)
+            proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+        else:
+            # preexec_fn not supported on Windows
+            proc = Popen(command, stdout=PIPE, stdin=PIPE)
+        # Determine which ephemeral port the server started on:
+        gateway_port = int(proc.stdout.readline())
+        # Create a thread to echo output from the GatewayServer, which is required
+        # for Java log output to show up:
+        class EchoOutputThread(Thread):
+            def __init__(self, stream):
+                Thread.__init__(self)
+                self.daemon = True
+                self.stream = stream
+
+            def run(self):
+                while True:
+                    line = self.stream.readline()
+                    sys.stderr.write(line)
+        EchoOutputThread(proc.stdout).start()
 
-        def run(self):
-            while True:
-                line = self.stream.readline()
-                sys.stderr.write(line)
-    EchoOutputThread(proc.stdout).start()
     # Connect to the gateway
-    gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
+    gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False)
+
     # Import the classes used by PySpark
     java_import(gateway.jvm, "org.apache.spark.SparkConf")
     java_import(gateway.jvm, "org.apache.spark.api.java.*")
@@ -70,28 +74,5 @@ def launch_gateway():
     java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
     java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
     java_import(gateway.jvm, "scala.Tuple2")
-    return gateway
 
-def set_env_vars_for_yarn():
-    # Add the spark jar, which includes the pyspark files, to the python path
-    env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
-    if "PYTHONPATH" in env_map:
-        env_map["PYTHONPATH"] += ":spark.jar"
-    else:
-        env_map["PYTHONPATH"] = "spark.jar"
-
-    os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
-
-def parse_env(env_str):
-    # Turns a comma-separated of env settings into a dict that maps env vars to
-    # their values.
-    env = {}
-    for var_str in env_str.split(","):
-        parts = var_str.split("=")
-        if len(parts) == 2:
-            env[parts[0]] = parts[1]
-        elif len(var_str) > 0:
-            print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
-            sys.exit(1)
-    
-    return env
+    return gateway
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8cf9d9cf1bd66e8feabc639f328140c1e845265e..64f2eeb12b4fcbd152d4e6e2b10cf07e0ea0d0bb 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -22,11 +22,14 @@ individual modules.
 from fileinput import input
 from glob import glob
 import os
+import re
 import shutil
+import subprocess
 import sys
-from tempfile import NamedTemporaryFile
+import tempfile
 import time
 import unittest
+import zipfile
 
 from pyspark.context import SparkContext
 from pyspark.files import SparkFiles
@@ -55,7 +58,7 @@ class TestCheckpoint(PySparkTestCase):
 
     def setUp(self):
         PySparkTestCase.setUp(self)
-        self.checkpointDir = NamedTemporaryFile(delete=False)
+        self.checkpointDir = tempfile.NamedTemporaryFile(delete=False)
         os.unlink(self.checkpointDir.name)
         self.sc.setCheckpointDir(self.checkpointDir.name)
 
@@ -148,7 +151,7 @@ class TestRDDFunctions(PySparkTestCase):
         # Regression test for SPARK-970
         x = u"\u00A1Hola, mundo!"
         data = self.sc.parallelize([x])
-        tempFile = NamedTemporaryFile(delete=True)
+        tempFile = tempfile.NamedTemporaryFile(delete=True)
         tempFile.close()
         data.saveAsTextFile(tempFile.name)
         raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
@@ -172,7 +175,7 @@ class TestRDDFunctions(PySparkTestCase):
 
     def test_deleting_input_files(self):
         # Regression test for SPARK-1025
-        tempFile = NamedTemporaryFile(delete=False)
+        tempFile = tempfile.NamedTemporaryFile(delete=False)
         tempFile.write("Hello World!")
         tempFile.close()
         data = self.sc.textFile(tempFile.name)
@@ -236,5 +239,125 @@ class TestDaemon(unittest.TestCase):
         from signal import SIGTERM
         self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
 
+
+class TestSparkSubmit(unittest.TestCase):
+    def setUp(self):
+        self.programDir = tempfile.mkdtemp()
+        self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
+
+    def tearDown(self):
+        shutil.rmtree(self.programDir)
+
+    def createTempFile(self, name, content):
+        """
+        Create a temp file with the given name and content and return its path.
+        Strips leading spaces from content up to the first '|' in each line.
+        """
+        pattern = re.compile(r'^ *\|', re.MULTILINE)
+        content = re.sub(pattern, '', content.strip())
+        path = os.path.join(self.programDir, name)
+        with open(path, "w") as f:
+            f.write(content)
+        return path
+
+    def createFileInZip(self, name, content):
+        """
+        Create a zip archive containing a file with the given content and return its path.
+        Strips leading spaces from content up to the first '|' in each line.
+        """
+        pattern = re.compile(r'^ *\|', re.MULTILINE)
+        content = re.sub(pattern, '', content.strip())
+        path = os.path.join(self.programDir, name + ".zip")
+        with zipfile.ZipFile(path, 'w') as zip:
+            zip.writestr(name, content)
+        return path
+
+    def test_single_script(self):
+        """Submit and test a single script file"""
+        script = self.createTempFile("test.py", """
+            |from pyspark import SparkContext
+            |
+            |sc = SparkContext()
+            |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()
+            """)
+        proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("[2, 4, 6]", out)
+
+    def test_script_with_local_functions(self):
+        """Submit and test a single script file calling a global function"""
+        script = self.createTempFile("test.py", """
+            |from pyspark import SparkContext
+            |
+            |def foo(x):
+            |    return x * 3
+            |
+            |sc = SparkContext()
+            |print sc.parallelize([1, 2, 3]).map(foo).collect()
+            """)
+        proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("[3, 6, 9]", out)
+
+    def test_module_dependency(self):
+        """Submit and test a script with a dependency on another module"""
+        script = self.createTempFile("test.py", """
+            |from pyspark import SparkContext
+            |from mylib import myfunc
+            |
+            |sc = SparkContext()
+            |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+            """)
+        zip = self.createFileInZip("mylib.py", """
+            |def myfunc(x):
+            |    return x + 1
+            """)
+        proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
+            stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("[2, 3, 4]", out)
+
+    def test_module_dependency_on_cluster(self):
+        """Submit and test a script with a dependency on another module on a cluster"""
+        script = self.createTempFile("test.py", """
+            |from pyspark import SparkContext
+            |from mylib import myfunc
+            |
+            |sc = SparkContext()
+            |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+            """)
+        zip = self.createFileInZip("mylib.py", """
+            |def myfunc(x):
+            |    return x + 1
+            """)
+        proc = subprocess.Popen(
+            [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script],
+            stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("[2, 3, 4]", out)
+
+    def test_single_script_on_cluster(self):
+        """Submit and test a single script on a cluster"""
+        script = self.createTempFile("test.py", """
+            |from pyspark import SparkContext
+            |
+            |def foo(x):
+            |    return x * 2
+            |
+            |sc = SparkContext()
+            |print sc.parallelize([1, 2, 3]).map(foo).collect()
+            """)
+        proc = subprocess.Popen(
+            [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script],
+            stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("[2, 4, 6]", out)
+
+
 if __name__ == "__main__":
     unittest.main()
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index e33f4f98030543a95a9fc478b074f9794b216b7e..566d96e16ed83f06a69cb0d3e4a8395fd6b6c6e9 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -45,8 +45,7 @@ class ReplSuite extends FunSuite {
     }
     val interp = new SparkILoop(in, new PrintWriter(out), master)
     org.apache.spark.repl.Main.interp = interp
-    val separator = System.getProperty("path.separator")
-    interp.process(Array("-classpath", paths.mkString(separator)))
+    interp.process(Array("-classpath", paths.mkString(File.pathSeparator)))
     org.apache.spark.repl.Main.interp = null
     if (interp.sparkContext != null) {
       interp.sparkContext.stop()