diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
index 2eec777c54e8d21da0a686c3768202ba1c0a1b93..66e709b7a333836fd1950ac3da50d8b87194fea1 100644
--- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -37,7 +37,7 @@ object RawNetworkGrep {
     RawTextHelper.warmUp(ssc.sc)
 
     val rawStreams = (1 to numStreams).map(_ =>
-      ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
+      ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
     val union = ssc.union(rawStreams)
     union.filter(_.contains("the")).count().foreach(r =>
       println("Grep count: " + r.collect().mkString))
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d0430b3f3eef8867c33ea9862df694958224dce8..25c67b279b7d2feb38a5b5d520f3ed0ab73d5f5c 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -170,7 +170,8 @@ class StreamingContext private (
    *       should be same.
    */
   def actorStream[T: ClassManifest](
-    props: Props, name: String,
+    props: Props,
+    name: String,
     storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
     supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
     networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -179,19 +180,20 @@ class StreamingContext private (
   /**
    * Create an input stream that receives messages pushed by a zeromq publisher.
    * @param publisherUrl Url of remote zeromq publisher
-   * @param zeroMQ topic to subscribe to
+   * @param subscribe topic to subscribe to
    * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
    *                       of byte thus it needs the converter(which might be deserializer of bytes)
    *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
    *                       and sub sequence refer to its payload.
    * @param storageLevel RDD storage level. Defaults to memory-only.
    */
-  def zeroMQStream[T: ClassManifest](publisherUrl:String,
+  def zeroMQStream[T: ClassManifest](
+      publisherUrl:String,
       subscribe: Subscribe,
       bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
       storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
-      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
-
+      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+    ): DStream[T] = {
     actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
         "ZeroMQReceiver", storageLevel, supervisorStrategy)
   }
@@ -283,7 +285,7 @@ class StreamingContext private (
    * @param storageLevel  Storage level to use for storing the received objects
    * @tparam T            Type of the objects in the received blocks
    */
-  def rawNetworkStream[T: ClassManifest](
+  def rawSocketStream[T: ClassManifest](
       hostname: String,
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@@ -352,7 +354,7 @@ class StreamingContext private (
   def twitterStream(
       username: String,
       password: String,
-      filters: Seq[String],
+      filters: Seq[String] = Nil,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): DStream[Status] = {
     val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index d2a0ba725fdbfd8d0e485b2b9c49342c2a07a129..f3b40b5b88ef9dd80f2f717ccdd2bf151ddd6f00 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -1,16 +1,26 @@
 package spark.streaming.api.java
 
-import scala.collection.JavaConversions._
-import java.lang.{Long => JLong, Integer => JInt}
-
 import spark.streaming._
-import dstream._
+import receivers.{ActorReceiver, ReceiverSupervisorStrategy}
+import spark.streaming.dstream._
 import spark.storage.StorageLevel
+
 import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import spark.api.java.{JavaSparkContext, JavaRDD}
+
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
+import twitter4j.Status
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
+
+import scala.collection.JavaConversions._
+
+import java.lang.{Long => JLong, Integer => JInt}
 import java.io.InputStream
 import java.util.{Map => JMap}
-import spark.api.java.{JavaSparkContext, JavaRDD}
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -128,7 +138,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param storageLevel  Storage level to use for storing the received objects
    *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
    */
-  def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
+  def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
   : JavaDStream[String] = {
     ssc.socketTextStream(hostname, port, storageLevel)
   }
@@ -186,13 +196,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param storageLevel  Storage level to use for storing the received objects
    * @tparam T            Type of the objects in the received blocks
    */
-  def rawNetworkStream[T](
+  def rawSocketStream[T](
       hostname: String,
       port: Int,
       storageLevel: StorageLevel): JavaDStream[T] = {
     implicit val cmt: ClassManifest[T] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
+    JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
   }
 
   /**
@@ -204,10 +214,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param port          Port to connect to for receiving data
    * @tparam T            Type of the objects in the received blocks
    */
-  def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
+  def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
     implicit val cmt: ClassManifest[T] =
       implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
+    JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
   }
 
   /**
@@ -246,11 +256,178 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param hostname Hostname of the slave machine to which the flume data will be sent
    * @param port     Port of the slave machine to which the flume data will be sent
    */
-  def flumeStream(hostname: String, port: Int):
-  JavaDStream[SparkFlumeEvent] = {
+  def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
     ssc.flumeStream(hostname, port)
   }
 
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param username Twitter username
+   * @param password Twitter password
+   * @param filters Set of filter strings to get only those tweets that match them
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def twitterStream(
+      username: String,
+      password: String,
+      filters: Array[String],
+      storageLevel: StorageLevel
+    ): JavaDStream[Status] = {
+    ssc.twitterStream(username, password, filters, storageLevel)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param username Twitter username
+   * @param password Twitter password
+   * @param filters Set of filter strings to get only those tweets that match them
+   */
+  def twitterStream(
+      username: String,
+      password: String,
+      filters: Array[String]
+    ): JavaDStream[Status] = {
+    ssc.twitterStream(username, password, filters)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param username Twitter username
+   * @param password Twitter password
+   */
+  def twitterStream(
+      username: String,
+      password: String
+    ): JavaDStream[Status] = {
+    ssc.twitterStream(username, password)
+  }
+
+  /**
+   * Create an input stream with any arbitrary user implemented actor receiver.
+   * @param props Props object defining creation of the actor
+   * @param name Name of the actor
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e parametrized type of data received and actorStream
+   *       should be same.
+   */
+  def actorStream[T](
+      props: Props,
+      name: String,
+      storageLevel: StorageLevel,
+      supervisorStrategy: SupervisorStrategy
+    ): JavaDStream[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream with any arbitrary user implemented actor receiver.
+   * @param props Props object defining creation of the actor
+   * @param name Name of the actor
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e parametrized type of data received and actorStream
+   *       should be same.
+   */
+  def actorStream[T](
+      props: Props,
+      name: String,
+      storageLevel: StorageLevel
+  ): JavaDStream[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    ssc.actorStream[T](props, name, storageLevel)
+  }
+
+  /**
+   * Create an input stream with any arbitrary user implemented actor receiver.
+   * @param props Props object defining creation of the actor
+   * @param name Name of the actor
+   *
+   * @note An important point to note:
+   *       Since Actor may exist outside the spark framework, It is thus user's responsibility
+   *       to ensure the type safety, i.e parametrized type of data received and actorStream
+   *       should be same.
+   */
+  def actorStream[T](
+      props: Props,
+      name: String
+    ): JavaDStream[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    ssc.actorStream[T](props, name)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel  Storage level to use for storing the received objects
+   */
+  def zeroMQStream[T](
+      publisherUrl:String,
+      subscribe: Subscribe,
+      bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+      storageLevel: StorageLevel,
+      supervisorStrategy: SupervisorStrategy
+    ): JavaDStream[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+  def zeroMQStream[T](
+      publisherUrl:String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+      storageLevel: StorageLevel
+    ): JavaDStream[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a zeromq publisher.
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param subscribe topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   *                       of byte thus it needs the converter(which might be deserializer of bytes)
+   *                       to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *                       and sub sequence refer to its payload.
+   */
+  def zeroMQStream[T](
+      publisherUrl:String,
+      subscribe: Subscribe,
+      bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+    ): JavaDStream[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+  }
+
   /**
    * Registers an output stream that will be computed every interval
    */
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 4530af5f6af5037b035b27bf3b305fdc7767b5d1..3bed500f73e84548a2f470a414ea68cfbb3df1ac 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext;
 import spark.streaming.JavaTestUtils;
 import spark.streaming.JavaCheckpointTestUtils;
 import spark.streaming.dstream.KafkaPartitionKey;
+import spark.streaming.InputStreamsSuite;
 
 import java.io.*;
 import java.util.*;
 
+import akka.actor.Props;
+import akka.zeromq.Subscribe;
+
+
+
 // The test suite itself is Serializable so that anonymous Function implementations can be
 // serialized, as an alternative to converting these anonymous classes to static inner classes;
 // see http://stackoverflow.com/questions/758570/.
@@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
-  public void testNetworkTextStream() {
+  public void testSocketTextStream() {
     JavaDStream test = ssc.socketTextStream("localhost", 12345);
   }
 
   @Test
-  public void testNetworkString() {
+  public void testSocketString() {
     class Converter extends Function<InputStream, Iterable<String>> {
       public Iterable<String> call(InputStream in) {
         BufferedReader reader = new BufferedReader(new InputStreamReader(in));
@@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
-  public void testRawNetworkStream() {
-    JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
+  public void testRawSocketStream() {
+    JavaDStream test = ssc.rawSocketStream("localhost", 12345);
   }
 
   @Test
   public void testFlumeStream() {
-    JavaDStream test = ssc.flumeStream("localhost", 12345);
+    JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
   }
 
   @Test
@@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable {
     JavaPairDStream<String, String> foo =
       ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
   }
+
+  @Test
+  public void testTwitterStream() {
+    String[] filters = new String[] { "good", "bad", "ugly" };
+    JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY());
+  }
+
+  @Test
+  public void testActorStream() {
+    JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+  }
+
+  @Test
+  public void testZeroMQStream() {
+    JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+      @Override
+      public Iterable<String> call(byte[][] b) throws Exception {
+        return null;
+      }
+    });
+  }
 }
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index c9f941c5b8b646ab474534ba060fd45a75a18678..1024d3ac9790e801a060d90a9faab1d64237515b 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   }
 
 
-  test("network input stream") {
+  test("socket input stream") {
     // Start the server
     val testServer = new TestServer(testPort)
     testServer.start()