diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
similarity index 55%
rename from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
rename to examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index 377bc0c98ec1f1185a97e8e559e7151e5aef7e85..fdb3a4c73c6cc6cfeca3e309c341e6e0df75f05c 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -1,19 +1,19 @@
-package spark.streaming.examples.twitter
+package spark.streaming.examples
 
-import spark.streaming.StreamingContext._
 import spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
 import spark.SparkContext._
-import spark.storage.StorageLevel
 
 /**
  * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
  * stream. The stream is instantiated with credentials and optionally filters supplied by the
  * command line arguments.
+ *
  */
-object TwitterBasic {
+object TwitterPopularTags {
   def main(args: Array[String]) {
     if (args.length < 3) {
-      System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+      System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
         " [filter1] [filter2] ... [filter n]")
       System.exit(1)
     }
@@ -21,10 +21,8 @@ object TwitterBasic {
     val Array(master, username, password) = args.slice(0, 3)
     val filters = args.slice(3, args.length)
 
-    val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
-    val stream = new TwitterInputDStream(ssc, username, password, filters,
-      StorageLevel.MEMORY_ONLY_SER)
-    ssc.registerInputStream(stream)
+    val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+    val stream = ssc.twitterStream(username, password, filters)
 
     val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
 
@@ -39,22 +37,17 @@ object TwitterBasic {
 
     // Print popular hashtags
     topCounts60.foreach(rdd => {
-      if (rdd.count() != 0) {
-        val topList = rdd.take(5)
-        println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
-        topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
-      }
+      val topList = rdd.take(5)
+      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
     })
 
     topCounts10.foreach(rdd => {
-      if (rdd.count() != 0) {
-        val topList = rdd.take(5)
-        println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
-        topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
-      }
+      val topList = rdd.take(5)
+      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
     })
 
     ssc.start()
   }
-
 }
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index af8b5ba01745b59f2c8c82f3c6c6bdb58a01fa45..c6d3cc8b1540f51effb85b9fccbe423d27bedca0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -154,10 +154,7 @@ object SparkBuild extends Build {
   )
 
   def examplesSettings = sharedSettings ++ Seq(
-    name := "spark-examples",
-    libraryDependencies ++= Seq(
-      "org.twitter4j" % "twitter4j-stream" % "3.0.3"
-    )
+    name := "spark-examples"
   )
 
   def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
@@ -166,7 +163,8 @@ object SparkBuild extends Build {
     name := "spark-streaming",
     libraryDependencies ++= Seq(
       "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
-      "com.github.sgroschupf" % "zkclient" % "0.1"
+      "com.github.sgroschupf" % "zkclient" % "0.1",
+      "org.twitter4j" % "twitter4j-stream" % "3.0.3"
     )
   ) ++ assemblySettings ++ extraAssemblySettings
 
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8cfbec51d26ac7a346b16f0c31b3664e16bba708..9be9d884be66c88c034f22f8a34e3f1455a1b12e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
 import java.util.UUID
+import twitter4j.Status
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -30,14 +31,14 @@ class StreamingContext private (
   ) extends Logging {
 
   /**
-   * Creates a StreamingContext using an existing SparkContext.
+   * Create a StreamingContext using an existing SparkContext.
    * @param sparkContext Existing SparkContext
    * @param batchDuration The time interval at which streaming data will be divided into batches
    */
   def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
 
   /**
-   * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+   * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param frameworkName A name for your job, to display on the cluster web UI
    * @param batchDuration The time interval at which streaming data will be divided into batches
@@ -46,7 +47,7 @@ class StreamingContext private (
     this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
 
   /**
-   * Re-creates a StreamingContext from a checkpoint file.
+   * Re-create a StreamingContext from a checkpoint file.
    * @param path Path either to the directory that was specified as the checkpoint directory, or
    *             to the checkpoint file 'graph' or 'graph.bk'.
    */
@@ -101,12 +102,12 @@ class StreamingContext private (
   protected[streaming] var scheduler: Scheduler = null
 
   /**
-   * Returns the associated Spark context
+   * Return the associated Spark context
    */
   def sparkContext = sc
 
   /**
-   * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+   * Set each DStreams in this context to remember RDDs it generated in the last given duration.
    * DStreams remember RDDs only for a limited duration of time and releases them for garbage
    * collection. This method allows the developer to specify how to long to remember the RDDs (
    * if the developer wishes to query old data outside the DStream computation).
@@ -117,7 +118,7 @@ class StreamingContext private (
   }
 
   /**
-   * Sets the context to periodically checkpoint the DStream operations for master
+   * Set the context to periodically checkpoint the DStream operations for master
    * fault-tolerance. By default, the graph will be checkpointed every batch interval.
    * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
    * @param interval checkpoint interval
@@ -200,7 +201,7 @@ class StreamingContext private (
   }
 
   /**
-   * Creates a input stream from a Flume source.
+   * Create a input stream from a Flume source.
    * @param hostname Hostname of the slave machine to which the flume data will be sent
    * @param port     Port of the slave machine to which the flume data will be sent
    * @param storageLevel  Storage level to use for storing the received objects
@@ -236,7 +237,7 @@ class StreamingContext private (
   }
 
   /**
-   * Creates a input stream that monitors a Hadoop-compatible filesystem
+   * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them using the given key-value types and input format.
    * File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
@@ -255,7 +256,7 @@ class StreamingContext private (
   }
 
   /**
-   * Creates a input stream that monitors a Hadoop-compatible filesystem
+   * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them using the given key-value types and input format.
    * @param directory HDFS directory to monitor for new file
    * @param filter Function to filter paths to process
@@ -274,9 +275,8 @@ class StreamingContext private (
     inputStream
   }
 
-
   /**
-   * Creates a input stream that monitors a Hadoop-compatible filesystem
+   * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them as text files (using key as LongWritable, value
    * as Text and input format as TextInputFormat). File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
@@ -286,7 +286,25 @@ class StreamingContext private (
   }
 
   /**
-   * Creates an input stream from a queue of RDDs. In each batch,
+   * Create a input stream that returns tweets received from Twitter.
+   * @param username Twitter username
+   * @param password Twitter password
+   * @param filters Set of filter strings to get only those tweets that match them
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def twitterStream(
+      username: String,
+      password: String,
+      filters: Seq[String],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): DStream[Status] = {
+    val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+    registerInputStream(inputStream)
+    inputStream
+  }
+
+  /**
+   * Create an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -300,7 +318,7 @@ class StreamingContext private (
   }
 
   /**
-   * Creates an input stream from a queue of RDDs. In each batch,
+   * Create an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -325,7 +343,7 @@ class StreamingContext private (
   }
 
   /**
-   * Registers an input stream that will be started (InputDStream.start() called) to get the
+   * Register an input stream that will be started (InputDStream.start() called) to get the
    * input data.
    */
   def registerInputStream(inputStream: InputDStream[_]) {
@@ -333,7 +351,7 @@ class StreamingContext private (
   }
 
   /**
-   * Registers an output stream that will be computed every interval
+   * Register an output stream that will be computed every interval
    */
   def registerOutputStream(outputStream: DStream[_]) {
     graph.addOutputStream(outputStream)
@@ -351,7 +369,7 @@ class StreamingContext private (
   }
 
   /**
-   * Starts the execution of the streams.
+   * Start the execution of the streams.
    */
   def start() {
     if (checkpointDir != null && checkpointDuration == null && graph != null) {
@@ -379,7 +397,7 @@ class StreamingContext private (
   }
 
   /**
-   * Stops the execution of the streams.
+   * Stop the execution of the streams.
    */
   def stop() {
     try {
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
similarity index 94%
rename from examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
rename to streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index 99ed4cdc1c12d13ef8367f2f5323fb7927b51ca2..d733254ddbb9158e669273f050dac5bda9de4ffb 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -1,4 +1,4 @@
-package spark.streaming.examples.twitter
+package spark.streaming.dstream
 
 import spark._
 import spark.streaming._
@@ -6,7 +6,6 @@ import dstream.{NetworkReceiver, NetworkInputDStream}
 import storage.StorageLevel
 import twitter4j._
 import twitter4j.auth.BasicAuthorization
-import collection.JavaConversions._
 
 /* A stream of Twitter statuses, potentially filtered by one or more keywords.
 *
@@ -50,7 +49,7 @@ class TwitterReceiver(
       def onTrackLimitationNotice(i: Int) {}
       def onScrubGeo(l: Long, l1: Long) {}
       def onStallWarning(stallWarning: StallWarning) {}
-      def onException(e: Exception) {}
+      def onException(e: Exception) { stopOnError(e) }
     })
 
     val query: FilterQuery = new FilterQuery