diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala index 078ad45ce8533e2b60798c853f2d62ab0f250ac0..84673470dbac919d00b36cedc35067ea8b4871bd 100644 --- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala @@ -67,6 +67,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + val pythonPath = sparkHome + "/python/:" + workerEnv.get("PYTHONPATH") + workerEnv.put("PYTHONPATH", pythonPath) daemon = pb.start() // Redirect the stderr to ours diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8734cacb0baf425975f2100a74fe8c568277367d..51c2cb980673c66e7af98774d083bae1ed676b4b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -756,9 +756,8 @@ class PipelinedRDD(RDD): self.ctx._gateway._gateway_client) self.ctx._pickled_broadcast_vars.clear() class_manifest = self._prev_jrdd.classManifest() - env = copy.copy(self.ctx.environment) - env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "") - env = MapConverter().convert(env, self.ctx._gateway._gateway_client) + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, class_manifest)