diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index df23710d469f233ec48b44083dfa5d3781659e5e..7503b1a5ea30ec89f85433c18c18b18ed68d23b6 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -64,7 +64,7 @@ class SparkContext(
     val appName: String,
     val sparkHome: String = null,
     val jars: Seq[String] = Nil,
-    environment: Map[String, String] = Map())
+    val environment: Map[String, String] = Map())
   extends Logging {
 
   // Ensure logging is initialized before we spawn any threads
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index f75fc27c7b2f63d024aea676bae8df9de55ded7e..5f18b1e15bd69c866e0e52b19f7b8b1e2303c13d 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -31,8 +31,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param appName A name for your application, to display on the cluster web UI
    * @param sparkHome The SPARK_HOME directory on the slave nodes
-   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
-   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+   * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
+   *                or an HDFS, HTTP, HTTPS, or FTP URL.
    */
   def this(master: String, appName: String, sparkHome: String, jarFile: String) =
     this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index b98718a5532e0d49e19437be39bc7482f7d6d328..2315aadbdf17ce5dcc70d17518b275c496cee001 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -38,10 +38,10 @@ The first thing a Spark program must do is to create a `SparkContext` object, wh
 This is done through the following constructor:
 
 {% highlight scala %}
-new SparkContext(master, jobName, [sparkHome], [jars])
+new SparkContext(master, appName, [sparkHome], [jars])
 {% endhighlight %}
 
-The `master` parameter is a string specifying a [Mesos](running-on-mesos.html) cluster to connect to, or a special "local" string to run in local mode, as described below. `jobName` is a name for your job, which will be shown in the Mesos web UI when running on a cluster. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
+The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
 
 In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable. For example, to run on four cores, use
 
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 7e913b783d89e68654831dd7b2d2f78dc9929811..42a4a5619de7cb3369023102ad6a7fc6a8beb1a5 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -17,16 +17,12 @@ This guide shows some how to start programming with DStreams.
 The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
 
 {% highlight scala %}
-new StreamingContext(master, jobName, batchDuration)
+new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
 {% endhighlight %}
 
-The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing mode. The `jobName` is the name of the streaming job, which is the same as the jobName used in SparkContext. It is used to identify this job in the cluster UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Starting with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion.
-
-This constructor creates a SparkContext object using the given `master` and `jobName` parameters. However, if you already have a SparkContext or you need to create a custom SparkContext by specifying list of JARs, then a StreamingContext can be created from the existing SparkContext, by using
-{% highlight scala %}
-new StreamingContext(sparkContext, batchDuration)
-{% endhighlight %}
+The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
 
+This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
 
 
 # Attaching Input Sources - InputDStreams
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e7a392fbbf346fe2c55b49acc7ebeb5413aa520c..e303e33e5e4014e7b252b491edc9d5090dc5e88f 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,6 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val framework = ssc.sc.appName
   val sparkHome = ssc.sc.sparkHome
   val jars = ssc.sc.jars
+  val environment = ssc.sc.environment
   val graph = ssc.graph
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 25c67b279b7d2feb38a5b5d520f3ed0ab73d5f5c..31b5d2c8bcff1100c1df1096be94460f643d2ec8 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -6,7 +6,7 @@ import akka.zeromq.Subscribe
 
 import spark.streaming.dstream._
 
-import spark.{RDD, Logging, SparkEnv, SparkContext}
+import spark._
 import spark.streaming.receivers.ActorReceiver
 import spark.streaming.receivers.ReceiverSupervisorStrategy
 import spark.streaming.receivers.ZeroMQReceiver
@@ -14,18 +14,18 @@ import spark.storage.StorageLevel
 import spark.util.MetadataCleaner
 import spark.streaming.receivers.ActorReceiver
 
-
 import scala.collection.mutable.Queue
+import scala.collection.Map
 
 import java.io.InputStream
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.UUID
 
 import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
-import java.util.UUID
 import twitter4j.Status
 
 /**
@@ -44,7 +44,9 @@ class StreamingContext private (
    * @param sparkContext Existing SparkContext
    * @param batchDuration The time interval at which streaming data will be divided into batches
    */
-  def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
+  def this(sparkContext: SparkContext, batchDuration: Duration) = {
+    this(sparkContext, null, batchDuration)
+  }
 
   /**
    * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
@@ -52,8 +54,17 @@ class StreamingContext private (
    * @param appName A name for your job, to display on the cluster web UI
    * @param batchDuration The time interval at which streaming data will be divided into batches
    */
-  def this(master: String, appName: String, batchDuration: Duration) =
-    this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration)
+  def this(
+      master: String,
+      appName: String,
+      batchDuration: Duration,
+      sparkHome: String = null,
+      jars: Seq[String] = Nil,
+      environment: Map[String, String] = Map()) = {
+    this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
+         null, batchDuration)
+  }
+
 
   /**
    * Re-create a StreamingContext from a checkpoint file.
@@ -65,15 +76,20 @@ class StreamingContext private (
   initLogging()
 
   if (sc_ == null && cp_ == null) {
-    throw new Exception("Streaming Context cannot be initilalized with " +
+    throw new Exception("Spark Streaming cannot be initialized with " +
       "both SparkContext and checkpoint as null")
   }
 
+  if (MetadataCleaner.getDelaySeconds < 0) {
+    throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+      + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
+  }
+
   protected[streaming] val isCheckpointPresent = (cp_ != null)
 
   protected[streaming] val sc: SparkContext = {
     if (isCheckpointPresent) {
-      new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars)
+      new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
     } else {
       sc_
     }
@@ -478,8 +494,12 @@ object StreamingContext {
     new PairDStreamFunctions[K, V](stream)
   }
 
-  protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = {
-
+  protected[streaming] def createNewSparkContext(
+      master: String,
+      appName: String,
+      sparkHome: String,
+      jars: Seq[String],
+      environment: Map[String, String]): SparkContext = {
     // Set the default cleaner delay to an hour if not already set.
     // This should be sufficient for even 1 second interval.
     if (MetadataCleaner.getDelaySeconds < 0) {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f3b40b5b88ef9dd80f2f717ccdd2bf151ddd6f00..b528ebbc1992af8dfe4b2f7ea2cd53f860c6fc5e 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -45,6 +45,42 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
   /**
    * Creates a StreamingContext.
+   * @param master Name of the Spark Master
+   * @param appName Name to be used when registering with the scheduler
+   * @param batchDuration The time interval at which streaming data will be divided into batches
+   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+   */
+  def this(
+      master: String,
+      appName: String,
+      batchDuration: Duration,
+      sparkHome: String,
+      jars: Array[String]) =
+    this(new StreamingContext(master, appName, batchDuration, sparkHome, jars))
+
+  /**
+   * Creates a StreamingContext.
+   * @param master Name of the Spark Master
+   * @param appName Name to be used when registering with the scheduler
+   * @param batchDuration The time interval at which streaming data will be divided into batches
+   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+   * @param environment Environment variables to set on worker nodes
+   */
+  def this(
+    master: String,
+    appName: String,
+    batchDuration: Duration,
+    sparkHome: String,
+    jars: Array[String],
+    environment: JMap[String, String]) =
+    this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
+
+  /**
+   * Creates a StreamingContext using an existing SparkContext.
    * @param sparkContext The underlying JavaSparkContext to use
    * @param batchDuration The time interval at which streaming data will be divided into batches
    */
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index dc7139cc273cf75be9dd85e1caaa8579dc052d7c..ddd9becf325e02e649d54d20278e2db37ed795af 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -13,6 +13,7 @@ import kafka.serializer.StringDecoder
 import kafka.utils.{Utils, ZKGroupTopicDirs}
 import kafka.utils.ZkUtils._
 
+import scala.collection.Map
 import scala.collection.mutable.HashMap
 import scala.collection.JavaConversions._