diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 2dfa02bd26f1360b82d5c9c850c40b47165905cc..0d6751f3fa6d2c452fa5188ffc575bdb05b955c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.deploy
 
+import java.net.URI
+
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 
 import org.apache.spark.api.python.{PythonUtils, RedirectThread}
+import org.apache.spark.util.Utils
 
 /**
  * A main class used by spark-submit to launch Python applications. It executes python as a
@@ -28,12 +31,15 @@ import org.apache.spark.api.python.{PythonUtils, RedirectThread}
  */
 object PythonRunner {
   def main(args: Array[String]) {
-    val primaryResource = args(0)
+    val pythonFile = args(0)
     val pyFiles = args(1)
     val otherArgs = args.slice(2, args.length)
-
     val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
 
+    // Format python file paths before adding them to the PYTHONPATH
+    val formattedPythonFile = formatPath(pythonFile)
+    val formattedPyFiles = formatPaths(pyFiles)
+
     // Launch a Py4J gateway server for the process to connect to; this will let it see our
     // Java system properties and such
     val gatewayServer = new py4j.GatewayServer(null, 0)
@@ -42,13 +48,13 @@ object PythonRunner {
     // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
     // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
     val pathElements = new ArrayBuffer[String]
-    pathElements ++= Option(pyFiles).getOrElse("").split(",")
+    pathElements ++= formattedPyFiles
     pathElements += PythonUtils.sparkPythonPath
     pathElements += sys.env.getOrElse("PYTHONPATH", "")
     val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
 
     // Launch Python process
-    val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
+    val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
     val env = builder.environment()
     env.put("PYTHONPATH", pythonPath)
     env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
@@ -59,4 +65,50 @@ object PythonRunner {
 
     System.exit(process.waitFor())
   }
+
+  /**
+   * Format the python file path so that it can be added to the PYTHONPATH correctly.
+   *
+   * Python does not understand URI schemes in paths. Before adding python files to the
+   * PYTHONPATH, we need to extract the path from the URI. This is safe to do because we
+   * currently only support local python files.
+   */
+  def formatPath(path: String, testWindows: Boolean = false): String = {
+    if (Utils.nonLocalPaths(path, testWindows).nonEmpty) {
+      throw new IllegalArgumentException("Launching Python applications through " +
+        s"spark-submit is currently only supported for local files: $path")
+    }
+    val windows = Utils.isWindows || testWindows
+    var formattedPath = if (windows) Utils.formatWindowsPath(path) else path
+
+    // Strip the URI scheme from the path
+    formattedPath =
+      new URI(formattedPath).getScheme match {
+        case Utils.windowsDrive(d) if windows => formattedPath
+        case null => formattedPath
+        case _ => new URI(formattedPath).getPath
+      }
+
+    // Guard against malformed paths potentially throwing NPE
+    if (formattedPath == null) {
+      throw new IllegalArgumentException(s"Python file path is malformed: $path")
+    }
+
+    // In Windows, the drive should not be prefixed with "/"
+    // For instance, python does not understand "/C:/path/to/sheep.py"
+    formattedPath = if (windows) formattedPath.stripPrefix("/") else formattedPath
+    formattedPath
+  }
+
+  /**
+   * Format each python file path in the comma-delimited list of paths, so it can be
+   * added to the PYTHONPATH correctly.
+   */
+  def formatPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+    Option(paths).getOrElse("")
+      .split(",")
+      .filter(_.nonEmpty)
+      .map { p => formatPath(p, testWindows) }
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c54331c00fab89a9e932bac75204672c46939760..7e9a9344e61f9186b77deee4775fd39815424465 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -136,9 +136,9 @@ object SparkSubmit {
         args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
         args.files = mergeFileLists(args.files, args.primaryResource)
       }
-      val pyFiles = Option(args.pyFiles).getOrElse("")
-      args.files = mergeFileLists(args.files, pyFiles)
-      sysProps("spark.submit.pyFiles") = pyFiles
+      args.files = mergeFileLists(args.files, args.pyFiles)
+      // Format python file paths properly before adding them to the PYTHONPATH
+      sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
     }
 
     // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
@@ -299,13 +299,18 @@ object SparkSubmit {
   }
 
   private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
-    val localJarFile = new File(localJar)
-    if (!localJarFile.exists()) {
-      printWarning(s"Jar $localJar does not exist, skipping.")
+    val uri = Utils.resolveURI(localJar)
+    uri.getScheme match {
+      case "file" | "local" =>
+        val file = new File(uri.getPath)
+        if (file.exists()) {
+          loader.addURL(file.toURI.toURL)
+        } else {
+          printWarning(s"Local jar $file does not exist, skipping.")
+        }
+      case _ =>
+        printWarning(s"Skip remote jar $uri.")
     }
-
-    val url = localJarFile.getAbsoluteFile.toURI.toURL
-    loader.addURL(url)
   }
 
   /**
@@ -318,7 +323,7 @@ object SparkSubmit {
   /**
    * Return whether the given primary resource represents a shell.
    */
-  private def isShell(primaryResource: String): Boolean = {
+  private[spark] def isShell(primaryResource: String): Boolean = {
     primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
   }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 0cc05fb95aef04a7d25a87c5a22f50b261a65337..bf449afae695fbead432956090d9fcf6f5340b25 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -118,7 +118,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
       } catch {
         case e: Exception =>
-          SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
+          SparkSubmit.printErrorAndExit("Cannot load main class from JAR: " + primaryResource)
           return
       }
     }
@@ -148,6 +148,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
       SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
     }
 
+    // Require all python files to be local, so we can add them to the PYTHONPATH
+    if (isPython) {
+      if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
+        SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
+      }
+      val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
+      if (nonLocalPyFiles.nonEmpty) {
+        SparkSubmit.printErrorAndExit(
+          s"Only local additional python files are supported: $nonLocalPyFiles")
+      }
+    }
+
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
       if (!hasHadoopEnv && !Utils.isTesting) {
@@ -263,19 +275,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         parse(tail)
 
       case ("--files") :: value :: tail =>
-        files = value
+        files = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--py-files") :: value :: tail =>
-        pyFiles = value
+        pyFiles = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--archives") :: value :: tail =>
-        archives = value
+        archives = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--jars") :: value :: tail =>
-        jars = value
+        jars = Utils.resolveURIs(value)
         parse(tail)
 
       case ("--help" | "-h") :: tail =>
@@ -296,7 +308,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
               val errMessage = s"Unrecognized option '$value'."
               SparkSubmit.printErrorAndExit(errMessage)
             case v =>
-              primaryResource = v
+              primaryResource =
+                if (!SparkSubmit.isShell(v)) {
+                  Utils.resolveURI(v).toString
+                } else {
+                  v
+                }
               inSparkOpts = false
               isPython = SparkSubmit.isPython(v)
               parse(tail)
@@ -327,8 +344,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         |  --name NAME                 A name of your application.
         |  --jars JARS                 Comma-separated list of local jars to include on the driver
         |                              and executor classpaths.
-        |  --py-files PY_FILES         Comma-separated list of .zip or .egg files to place on the
-        |                              PYTHONPATH for Python apps.
+        |  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
+        |                              on the PYTHONPATH for Python apps.
         |  --files FILES               Comma-separated list of files to be placed in the working
         |                              directory of each executor.
         |  --properties-file FILE      Path to a file from which to load extra properties. If not
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0c7cff019fce132bd51b29e8b6c06e92502545f6..3b1b6df089b8efe5807503c2fb4f573f8fe3f2f9 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1086,9 +1086,19 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Return true if this is Windows.
+   * Whether the underlying operating system is Windows.
    */
-  def isWindows = SystemUtils.IS_OS_WINDOWS
+  val isWindows = SystemUtils.IS_OS_WINDOWS
+
+  /**
+   * Pattern for matching a Windows drive, which contains only a single alphabet character.
+   */
+  val windowsDrive = "([a-zA-Z])".r
+
+  /**
+   * Format a Windows path such that it can be safely passed to a URI.
+   */
+  def formatWindowsPath(path: String): String = path.replace("\\", "/")
 
   /**
    * Indicates whether Spark is currently running unit tests.
@@ -1166,4 +1176,61 @@ private[spark] object Utils extends Logging {
         true
     }
   }
+
+  /**
+   * Return a well-formed URI for the file described by a user input string.
+   *
+   * If the supplied path does not contain a scheme, or is a relative path, it will be
+   * converted into an absolute path with a file:// scheme.
+   */
+  def resolveURI(path: String, testWindows: Boolean = false): URI = {
+
+    // In Windows, the file separator is a backslash, but this is inconsistent with the URI format
+    val windows = isWindows || testWindows
+    val formattedPath = if (windows) formatWindowsPath(path) else path
+
+    val uri = new URI(formattedPath)
+    if (uri.getPath == null) {
+      throw new IllegalArgumentException(s"Given path is malformed: $uri")
+    }
+    uri.getScheme match {
+      case windowsDrive(d) if windows =>
+        new URI("file:/" + uri.toString.stripPrefix("/"))
+      case null =>
+        // Preserve fragments for HDFS file name substitution (denoted by "#")
+        // For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
+        val fragment = uri.getFragment
+        val part = new File(uri.getPath).toURI
+        new URI(part.getScheme, part.getPath, fragment)
+      case _ =>
+        uri
+    }
+  }
+
+  /** Resolve a comma-separated list of paths. */
+  def resolveURIs(paths: String, testWindows: Boolean = false): String = {
+    if (paths == null || paths.trim.isEmpty) {
+      ""
+    } else {
+      paths.split(",").map { p => Utils.resolveURI(p, testWindows) }.mkString(",")
+    }
+  }
+
+  /** Return all non-local paths from a comma-separated list of paths. */
+  def nonLocalPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+    val windows = isWindows || testWindows
+    if (paths == null || paths.trim.isEmpty) {
+      Array.empty
+    } else {
+      paths.split(",").filter { p =>
+        val formattedPath = if (windows) formatWindowsPath(p) else p
+        new URI(formattedPath).getScheme match {
+          case windowsDrive(d) if windows => false
+          case "local" | "file" | null => false
+          case _ => true
+        }
+      }
+    }
+  }
+
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..bb6251fb4bfbef5d3bf7e335e322acb5dbaac1be
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.FunSuite
+
+class PythonRunnerSuite extends FunSuite {
+
+  // Test formatting a single path to be added to the PYTHONPATH
+  test("format path") {
+    assert(PythonRunner.formatPath("spark.py") === "spark.py")
+    assert(PythonRunner.formatPath("file:/spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("file:///spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("local:/spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("local:///spark.py") === "/spark.py")
+    assert(PythonRunner.formatPath("C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
+    assert(PythonRunner.formatPath("/C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
+    assert(PythonRunner.formatPath("file:/C:/a/b/spark.py", testWindows = true) ===
+      "C:/a/b/spark.py")
+    intercept[IllegalArgumentException] { PythonRunner.formatPath("one:two") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:s3:xtremeFS") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:/path/to/some.py") }
+  }
+
+  // Test formatting multiple comma-separated paths to be added to the PYTHONPATH
+  test("format paths") {
+    assert(PythonRunner.formatPaths("spark.py") === Array("spark.py"))
+    assert(PythonRunner.formatPaths("file:/spark.py") === Array("/spark.py"))
+    assert(PythonRunner.formatPaths("file:/app.py,local:/spark.py") ===
+      Array("/app.py", "/spark.py"))
+    assert(PythonRunner.formatPaths("me.py,file:/you.py,local:/we.py") ===
+      Array("me.py", "/you.py", "/we.py"))
+    assert(PythonRunner.formatPaths("C:/a/b/spark.py", testWindows = true) ===
+      Array("C:/a/b/spark.py"))
+    assert(PythonRunner.formatPaths("/C:/a/b/spark.py", testWindows = true) ===
+      Array("C:/a/b/spark.py"))
+    assert(PythonRunner.formatPaths("C:/free.py,pie.py", testWindows = true) ===
+      Array("C:/free.py", "pie.py"))
+    assert(PythonRunner.formatPaths("lovely.py,C:/free.py,file:/d:/fry.py", testWindows = true) ===
+      Array("lovely.py", "C:/free.py", "d:/fry.py"))
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("one:two,three") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("two,three,four:five:six") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("hdfs:/some.py,foo.py") }
+    intercept[IllegalArgumentException] { PythonRunner.formatPaths("foo.py,hdfs:/some.py") }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 6c0deede53784d91725a5402b97800cec11aa07e..02427a4a835069853cc2454f32188ded3896483c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -91,7 +91,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
       "--jars=one.jar,two.jar,three.jar",
       "--name=myApp")
     val appArgs = new SparkSubmitArguments(clArgs)
-    appArgs.jars should be ("one.jar,two.jar,three.jar")
+    appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
     appArgs.name should be ("myApp")
   }
 
@@ -125,17 +125,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
-    childArgsStr should include ("--jar thejar.jar")
     childArgsStr should include ("--class org.SomeClass")
-    childArgsStr should include ("--addJars one.jar,two.jar,three.jar")
     childArgsStr should include ("--executor-memory 5g")
     childArgsStr should include ("--driver-memory 4g")
     childArgsStr should include ("--executor-cores 5")
     childArgsStr should include ("--arg arg1 --arg arg2")
     childArgsStr should include ("--queue thequeue")
-    childArgsStr should include ("--files file1.txt,file2.txt")
-    childArgsStr should include ("--archives archive1.txt,archive2.txt")
     childArgsStr should include ("--num-executors 6")
+    childArgsStr should include regex ("--jar .*thejar.jar")
+    childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
+    childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
+    childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt")
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
     classpath should have length (0)
     sysProps("spark.app.name") should be ("beauty")
@@ -162,18 +162,19 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
-    classpath should contain ("thejar.jar")
-    classpath should contain ("one.jar")
-    classpath should contain ("two.jar")
-    classpath should contain ("three.jar")
+    classpath should have length (4)
+    classpath(0) should endWith ("thejar.jar")
+    classpath(1) should endWith ("one.jar")
+    classpath(2) should endWith ("two.jar")
+    classpath(3) should endWith ("three.jar")
     sysProps("spark.app.name") should be ("trill")
-    sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.executor.cores") should be ("5")
     sysProps("spark.yarn.queue") should be ("thequeue")
-    sysProps("spark.yarn.dist.files") should be ("file1.txt,file2.txt")
-    sysProps("spark.yarn.dist.archives") should be ("archive1.txt,archive2.txt")
     sysProps("spark.executor.instances") should be ("6")
+    sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
+    sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
+    sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
     sysProps("SPARK_SUBMIT") should be ("true")
   }
 
@@ -190,11 +191,13 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
-    childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
-    childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
+    childArgsStr should startWith ("--memory 4g --cores 5 --supervise")
+    childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
     mainClass should be ("org.apache.spark.deploy.Client")
-    classpath should have length (0)
-    sysProps should have size (2) // contains --jar entry and SPARK_SUBMIT
+    classpath should have size (0)
+    sysProps should have size (2)
+    sysProps.keys should contain ("spark.jars")
+    sysProps.keys should contain ("SPARK_SUBMIT")
   }
 
   test("handles standalone client mode") {
@@ -211,7 +214,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
-    classpath should contain ("thejar.jar")
+    classpath should have length (1)
+    classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
   }
@@ -230,7 +234,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
     mainClass should be ("org.SomeClass")
-    classpath should contain ("thejar.jar")
+    classpath should have length (1)
+    classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
   }
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index cf9e20d347ddd1b47c81ca1fc85a872350fe49e8..0aad882ed76a8df09487ecc54946c7a1d64ae0f3 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import scala.util.Random
 
 import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
+import java.net.URI
 import java.nio.{ByteBuffer, ByteOrder}
 
 import com.google.common.base.Charsets
@@ -168,5 +169,68 @@ class UtilsSuite extends FunSuite {
     assert(result.size.equals(1))
     assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
   }
-}
 
+  test("resolveURI") {
+    def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
+      assume(before.split(",").length == 1)
+      assert(Utils.resolveURI(before, testWindows) === new URI(after))
+      assert(Utils.resolveURI(after, testWindows) === new URI(after))
+      assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
+      assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
+    }
+    val cwd = System.getProperty("user.dir")
+    assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
+    assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
+    assertResolves("spark.jar", s"file:$cwd/spark.jar")
+    assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
+    assertResolves("C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+    assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+    assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+    assertResolves("file:///C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+    assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
+    intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
+    intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
+
+    // Test resolving comma-delimited paths
+    assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
+    assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
+    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
+    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
+    assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
+      s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
+  }
+
+  test("nonLocalPaths") {
+    assert(Utils.nonLocalPaths("spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("file:/spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("file:///spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("local:/spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("local:///spark.jar") === Array.empty)
+    assert(Utils.nonLocalPaths("hdfs:/spark.jar") === Array("hdfs:/spark.jar"))
+    assert(Utils.nonLocalPaths("hdfs:///spark.jar") === Array("hdfs:///spark.jar"))
+    assert(Utils.nonLocalPaths("file:/spark.jar,local:/smart.jar,family.py") === Array.empty)
+    assert(Utils.nonLocalPaths("local:/spark.jar,file:/smart.jar,family.py") === Array.empty)
+    assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar") ===
+      Array("hdfs:/spark.jar", "s3:/smart.jar"))
+    assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar,local.py,file:/hello/pi.py") ===
+      Array("hdfs:/spark.jar", "s3:/smart.jar"))
+    assert(Utils.nonLocalPaths("local.py,hdfs:/spark.jar,file:/hello/pi.py,s3:/smart.jar") ===
+      Array("hdfs:/spark.jar", "s3:/smart.jar"))
+
+    // Test Windows paths
+    assert(Utils.nonLocalPaths("C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("file:/C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("file:///C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("local:/C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("local:///C:/some/path.jar", testWindows = true) === Array.empty)
+    assert(Utils.nonLocalPaths("hdfs:/a.jar,C:/my.jar,s3:/another.jar", testWindows = true) ===
+      Array("hdfs:/a.jar", "s3:/another.jar"))
+    assert(Utils.nonLocalPaths("D:/your.jar,hdfs:/a.jar,s3:/another.jar", testWindows = true) ===
+      Array("hdfs:/a.jar", "s3:/another.jar"))
+    assert(Utils.nonLocalPaths("hdfs:/a.jar,s3:/another.jar,e:/our.jar", testWindows = true) ===
+      Array("hdfs:/a.jar", "s3:/another.jar"))
+  }
+
+}
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c9ff82d23b3cfb0eeee177c2ece5a583903801d3..27b440d73bdc38501e84070b6d6000c39d864713 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -159,10 +159,14 @@ class SparkContext(object):
             self.addPyFile(path)
 
         # Deploy code dependencies set by spark-submit; these will already have been added
-        # with SparkContext.addFile, so we just need to add them
+        # with SparkContext.addFile, so we just need to add them to the PYTHONPATH
         for path in self._conf.get("spark.submit.pyFiles", "").split(","):
             if path != "":
-                self._python_includes.append(os.path.basename(path))
+                (dirname, filename) = os.path.split(path)
+                self._python_includes.append(filename)
+                sys.path.append(path)
+                if not dirname in sys.path:
+                    sys.path.append(dirname)
 
         # Create a temporary directory inside spark.local.dir:
         local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 5f34362ccd973c814588d71ee74786483a80c5b4..e1db4d5395ab9e6393a9240c7ad06c05d5b3b537 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -942,7 +942,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
 
   def createSparkContext(): SparkContext = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
+    val jars = SparkILoop.getAddedJars
     val conf = new SparkConf()
       .setMaster(getMaster())
       .setAppName("Spark shell")
@@ -997,7 +997,8 @@ object SparkILoop {
     val propJars = sys.props.get("spark.jars").flatMap { p =>
       if (p == "") None else Some(p)
     }
-    propJars.orElse(envJars).map(_.split(",")).getOrElse(Array.empty)
+    val jars = propJars.orElse(envJars).getOrElse("")
+    Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
   }
 
   // Designed primarily for use by test code: take a String with a