diff --git a/core/pom.xml b/core/pom.xml
index 043f6cf68d15e3114c2640be66df2e60e393ebf5..aac0a9d11e12dafcb8bb053dc1f97a40be0ab4e2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -17,215 +17,219 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent</artifactId>
-    <version>0.9.0-incubating-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-parent</artifactId>
+        <version>0.9.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
 
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-core_2.10</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project Core</name>
-  <url>http://spark.incubator.apache.org/</url>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-core_2.10</artifactId>
+    <packaging>jar</packaging>
+    <name>Spark Project Core</name>
+    <url>http://spark.incubator.apache.org/</url>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>net.java.dev.jets3t</groupId>
-      <artifactId>jets3t</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-ipc</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-server</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.ning</groupId>
-      <artifactId>compress-lzf</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>snappy-java</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.ow2.asm</groupId>
-      <artifactId>asm</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>chill_${scala.binary.version}</artifactId>
-      <version>0.3.1</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>chill-java</artifactId>
-      <version>0.3.1</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-remote_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>net.liftweb</groupId>
-      <artifactId>lift-json_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>it.unimi.dsi</groupId>
-      <artifactId>fastutil</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>colt</groupId>
-      <artifactId>colt</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.mesos</groupId>
-      <artifactId>mesos</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-jvm</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-json</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-ganglia</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-graphite</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.derby</groupId>
-      <artifactId>derby</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymock</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.novocode</groupId>
-      <artifactId>junit-interface</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>test</phase>
-            <goals>
-              <goal>run</goal>
-            </goals>
-            <configuration>
-              <exportAntProperties>true</exportAntProperties>
-              <tasks>
-                <property name="spark.classpath" refid="maven.test.classpath" />
-                <property environment="env" />
-                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
-                  <condition>
-                    <not>
-                      <or>
-                        <isset property="env.SCALA_HOME" />
-                        <isset property="env.SCALA_LIBRARY_PATH" />
-                      </or>
-                    </not>
-                  </condition>
-                </fail>
-              </tasks>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <configuration>
-          <environmentVariables>
-            <SPARK_HOME>${basedir}/..</SPARK_HOME>
-            <SPARK_TESTING>1</SPARK_TESTING>
-            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
-          </environmentVariables>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.java.dev.jets3t</groupId>
+            <artifactId>jets3t</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-ipc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ning</groupId>
+            <artifactId>compress-lzf</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>chill_${scala.binary.version}</artifactId>
+            <version>0.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>chill-java</artifactId>
+            <version>0.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>${akka.group}</groupId>
+            <artifactId>akka-remote_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${akka.group}</groupId>
+            <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.liftweb</groupId>
+            <artifactId>lift-json_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>colt</groupId>
+            <artifactId>colt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mesos</groupId>
+            <artifactId>mesos</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.clearspring.analytics</groupId>
+            <artifactId>stream</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-json</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-ganglia</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalacheck</groupId>
+            <artifactId>scalacheck_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.novocode</groupId>
+            <artifactId>junit-interface</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <exportAntProperties>true</exportAntProperties>
+                            <tasks>
+                                <property name="spark.classpath" refid="maven.test.classpath" />
+                                <property environment="env" />
+                                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
+                                    <condition>
+                                        <not>
+                                            <or>
+                                                <isset property="env.SCALA_HOME" />
+                                                <isset property="env.SCALA_LIBRARY_PATH" />
+                                            </or>
+                                        </not>
+                                    </condition>
+                                </fail>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <environmentVariables>
+                        <SPARK_HOME>${basedir}/..</SPARK_HOME>
+                        <SPARK_TESTING>1</SPARK_TESTING>
+                        <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+                    </environmentVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index edd0fc56f861abbc0603975d0cd12e9944c9d447..46d61503bc5625e6448e60ae7c266aa6e652b2fe 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -20,19 +20,24 @@ package org.apache.spark.network.netty;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioSocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 class FileClient {
 
   private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
-  private FileClientHandler handler = null;
+  private final FileClientHandler handler;
   private Channel channel = null;
   private Bootstrap bootstrap = null;
-  private int connectTimeout = 60*1000; // 1 min
+  private EventLoopGroup group = null;
+  private final int connectTimeout;
+  private final int sendTimeout = 60; // 1 min
 
   public FileClient(FileClientHandler handler, int connectTimeout) {
     this.handler = handler;
@@ -40,8 +45,9 @@ class FileClient {
   }
 
   public void init() {
+    group = new OioEventLoopGroup();
     bootstrap = new Bootstrap();
-    bootstrap.group(new OioEventLoopGroup())
+    bootstrap.group(group)
       .channel(OioSocketChannel.class)
       .option(ChannelOption.SO_KEEPALIVE, true)
       .option(ChannelOption.TCP_NODELAY, true)
@@ -56,6 +62,7 @@ class FileClient {
       // ChannelFuture cf = channel.closeFuture();
       //cf.addListener(new ChannelCloseListener(this));
     } catch (InterruptedException e) {
+      LOG.warn("FileClient interrupted while trying to connect", e);
       close();
     }
   }
@@ -71,16 +78,21 @@ class FileClient {
   public void sendRequest(String file) {
     //assert(file == null);
     //assert(channel == null);
-    channel.write(file + "\r\n");
+      try {
+          // Should be able to send the message to network link channel.
+          boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
+          if (!bSent) {
+              throw new RuntimeException("Failed to send");
+          }
+      } catch (InterruptedException e) {
+          LOG.error("Error", e);
+      }
   }
 
   public void close() {
-    if(channel != null) {
-      channel.close();
-      channel = null;
-    }
-    if ( bootstrap!=null) {
-      bootstrap.shutdown();
+    if (group != null) {
+      group.shutdownGracefully();
+      group = null;
       bootstrap = null;
     }
   }
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
index 65ee15d63b8548b27275419a2c1507cf0e23847b..fb61be1c12d79df4a504a640537a1396b9b54541 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -17,15 +17,13 @@
 
 package org.apache.spark.network.netty;
 
-import io.netty.buffer.BufType;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 
-
 class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
 
-  private FileClientHandler fhandler;
+  private final FileClientHandler fhandler;
 
   public FileClientChannelInitializer(FileClientHandler handler) {
     fhandler = handler;
@@ -35,7 +33,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
   public void initChannel(SocketChannel channel) {
     // file no more than 2G
     channel.pipeline()
-      .addLast("encoder", new StringEncoder(BufType.BYTE))
+      .addLast("encoder", new StringEncoder())
       .addLast("handler", fhandler);
   }
 }
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
index 8a09210245fff7eccfdd50fa551be2a9d53e9954..63d3d927255f9613f79a2d5894bb337ba83a1d54 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
@@ -19,11 +19,11 @@ package org.apache.spark.network.netty;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundByteHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 
 import org.apache.spark.storage.BlockId;
 
-abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
+abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
   private FileHeader currentHeader = null;
 
@@ -37,13 +37,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
   public abstract void handleError(BlockId blockId);
 
   @Override
-  public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
-    // Use direct buffer if possible.
-    return ctx.alloc().ioBuffer();
-  }
-
-  @Override
-  public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
+  public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
     // get header
     if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
       currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index a99af348ce78207d156d51855038c1e63b5f3b24..aea75344594f5023bf912b49c74d4c39a181384a 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -22,13 +22,12 @@ import java.net.InetSocketAddress;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioServerSocketChannel;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Server that accept the path of a file an echo back its content.
  */
@@ -36,7 +35,8 @@ class FileServer {
 
   private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
 
-  private ServerBootstrap bootstrap = null;
+  private EventLoopGroup bossGroup = null;
+  private EventLoopGroup workerGroup = null;
   private ChannelFuture channelFuture = null;
   private int port = 0;
   private Thread blockingThread = null;
@@ -45,8 +45,11 @@ class FileServer {
     InetSocketAddress addr = new InetSocketAddress(port);
 
     // Configure the server.
-    bootstrap = new ServerBootstrap();
-    bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+    bossGroup = new OioEventLoopGroup();
+    workerGroup = new OioEventLoopGroup();
+
+    ServerBootstrap bootstrap = new ServerBootstrap();
+    bootstrap.group(bossGroup, workerGroup)
         .channel(OioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .option(ChannelOption.SO_RCVBUF, 1500)
@@ -89,13 +92,19 @@ class FileServer {
   public void stop() {
     // Close the bound channel.
     if (channelFuture != null) {
-      channelFuture.channel().close();
+      channelFuture.channel().close().awaitUninterruptibly();
       channelFuture = null;
     }
-    // Shutdown bootstrap.
-    if (bootstrap != null) {
-      bootstrap.shutdown();
-      bootstrap = null;
+
+    // Shutdown event groups
+    if (bossGroup != null) {
+       bossGroup.shutdownGracefully();
+       bossGroup = null;
+    }
+
+    if (workerGroup != null) {
+       workerGroup.shutdownGracefully();
+       workerGroup = null;
     }
     // TODO: Shutdown all accepted channels as well ?
   }
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index 833af1632de9ca2b87d0b51f5732ae359c4e2d75..3f15ff898fc1e981e169e6c5ca5bb05dd21c7c83 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -23,7 +23,6 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.codec.Delimiters;
 import io.netty.handler.codec.string.StringDecoder;
 
-
 class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
 
   PathResolver pResolver;
@@ -36,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
   public void initChannel(SocketChannel channel) {
     channel.pipeline()
       .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
-      .addLast("strDecoder", new StringDecoder())
+      .addLast("stringDecoder", new StringDecoder())
       .addLast("handler", new FileServerHandler(pResolver));
   }
 }
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index 172c6e4b1cce198020a314b4f6fb3062b3dd5cff..e2d9391b4cc9321b8db676b8713bcddfe8c0b63f 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -21,22 +21,26 @@ import java.io.File;
 import java.io.FileInputStream;
 
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.DefaultFileRegion;
 
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.storage.FileSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
+class FileServerHandler extends SimpleChannelInboundHandler<String> {
 
-  PathResolver pResolver;
+  private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
+  private final PathResolver pResolver;
 
   public FileServerHandler(PathResolver pResolver){
     this.pResolver = pResolver;
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
+  public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
     BlockId blockId = BlockId.apply(blockIdString);
     FileSegment fileSegment = pResolver.getBlockLocation(blockId);
     // if getBlockLocation returns null, close the channel
@@ -60,10 +64,10 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
       int len = new Long(length).intValue();
       ctx.write((new FileHeader(len, blockId)).buffer());
       try {
-        ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
+        ctx.write(new DefaultFileRegion(new FileInputStream(file)
           .getChannel(), fileSegment.offset(), fileSegment.length()));
       } catch (Exception e) {
-        e.printStackTrace();
+          LOG.error("Exception: ", e);
       }
     } else {
       ctx.write(new FileHeader(0, blockId).buffer());
@@ -73,7 +77,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    cause.printStackTrace();
+    LOG.error("Exception: ", cause);
     ctx.close();
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 70fd499d719c2583af975fa2ce808cbee688783a..d3903ce9e336dedde50700d00776205de82184c9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.io._
 import java.net.URI
-import java.util.Properties
+import java.util.{UUID, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.Map
@@ -852,22 +852,15 @@ class SparkContext(
 
   /**
    * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists and useExisting is set to true, then the
-   * exisiting directory will be used. Otherwise an exception will be thrown to
-   * prevent accidental overriding of checkpoint files in the existing directory.
+   * be a HDFS path if running on a cluster.
    */
-  def setCheckpointDir(dir: String, useExisting: Boolean = false) {
-    val path = new Path(dir)
-    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
-    if (!useExisting) {
-      if (fs.exists(path)) {
-        throw new Exception("Checkpoint directory '" + path + "' already exists.")
-      } else {
-        fs.mkdirs(path)
-      }
+  def setCheckpointDir(directory: String) {
+    checkpointDir = Option(directory).map { dir =>
+      val path = new Path(dir, UUID.randomUUID().toString)
+      val fs = path.getFileSystem(hadoopConfiguration)
+      fs.mkdirs(path)
+      fs.getFileStatus(path).getPath().toString
     }
-    checkpointDir = Some(dir)
   }
 
   /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 363667fa863534ee9ad26105595f30d41a364d02..55c87450ac65ae5256cba054cf4ce7684773c592 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -611,6 +611,42 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * Return an RDD with the values of each tuple.
    */
   def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
+
+  /**
+   * Return approximate number of distinct values for each key in this RDD.
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+   * Partitioner to partition the output RDD.
+   */
+  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
+    rdd.countApproxDistinctByKey(relativeSD, partitioner)
+  }
+
+  /**
+   * Return approximate number of distinct values for each key this RDD.
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. The default value of
+   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+   * level.
+   */
+  def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
+    rdd.countApproxDistinctByKey(relativeSD)
+  }
+
+
+  /**
+   * Return approximate number of distinct values for each key in this RDD.
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+   * output RDD into numPartitions.
+   *
+   */
+  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
+    rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+  }
 }
 
 object JavaPairRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f344804b4cf6881b2091b72b07158bf04e4b4f2a..924d8af0602f4a7fa8e33b637ebd5af23059ad81 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -444,4 +444,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
     val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
     takeOrdered(num, comp)
   }
+
+  /**
+   * Return approximate number of distinct elements in the RDD.
+   *
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. The default value of
+   * relativeSD is 0.05.
+   */
+  def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+
 }
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index acf328aa6a2b48d24bc293dca0153e3ba9e190ed..50f2021d0194da58de8fccdeb7a45df15a3be049 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -381,20 +381,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
 
   /**
    * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists and useExisting is set to true, then the
-   * exisiting directory will be used. Otherwise an exception will be thrown to
-   * prevent accidental overriding of checkpoint files in the existing directory.
-   */
-  def setCheckpointDir(dir: String, useExisting: Boolean) {
-    sc.setCheckpointDir(dir, useExisting)
-  }
-
-  /**
-   * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists, an exception will be thrown to prevent accidental
-   * overriding of checkpoint files.
+   * be a HDFS path if running on a cluster.
    */
   def setCheckpointDir(dir: String) {
     sc.setCheckpointDir(dir)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a712ef1c27093b5c8e6b53fd0efa5083f0170886..293a7d1f6841552c1570ead689bcf5e22a65f6a3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -18,12 +18,12 @@
 package org.apache.spark.rdd
 
 import java.io.IOException
-
 import scala.reflect.ClassTag
-
-import org.apache.hadoop.fs.Path
 import org.apache.spark._
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 
 private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
 
@@ -34,6 +34,8 @@ private[spark]
 class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
   extends RDD[T](sc, Nil) {
 
+  val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+
   @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
 
   override def getPartitions: Array[Partition] = {
@@ -65,7 +67,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
     val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
-    CheckpointRDD.readFromFile(file, context)
+    CheckpointRDD.readFromFile(file, broadcastedConf, context)
   }
 
   override def checkpoint() {
@@ -79,10 +81,14 @@ private[spark] object CheckpointRDD extends Logging {
     "part-%05d".format(splitId)
   }
 
-  def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+  def writeToFile[T](
+      path: String,
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+      blockSize: Int = -1
+    )(ctx: TaskContext, iterator: Iterator[T]) {
     val env = SparkEnv.get
     val outputDir = new Path(path)
-    val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+    val fs = outputDir.getFileSystem(broadcastedConf.value.value)
 
     val finalOutputName = splitIdToFile(ctx.partitionId)
     val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -119,9 +125,13 @@ private[spark] object CheckpointRDD extends Logging {
     }
   }
 
-  def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+  def readFromFile[T](
+      path: Path,
+      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+      context: TaskContext
+    ): Iterator[T] = {
     val env = SparkEnv.get
-    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
+    val fs = path.getFileSystem(broadcastedConf.value.value)
     val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
@@ -144,8 +154,10 @@ private[spark] object CheckpointRDD extends Logging {
     val sc = new SparkContext(cluster, "CheckpointRDD Test")
     val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
     val path = new Path(hdfsPath, "temp")
-    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
-    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
+    val conf = SparkHadoopUtil.get.newConfiguration()
+    val fs = path.getFileSystem(conf)
+    val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
     assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 48168e152e954ceb548caf3f04fb672c23755479..04a8d05988f168a2a294c939aa70e9c8ef78824c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -40,12 +40,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
 import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
+
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.Aggregator
 import org.apache.spark.Partitioner
 import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.util.SerializableHyperLogLog
 
 /**
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -207,6 +210,45 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     self.map(_._1).countByValueApprox(timeout, confidence)
   }
 
+  /**
+   * Return approximate number of distinct values for each key in this RDD.
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+   * Partitioner to partition the output RDD.
+   */
+  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+    val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
+    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
+    val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+
+    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+  }
+
+  /**
+   * Return approximate number of distinct values for each key in this RDD. 
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+   * output RDD into numPartitions.
+   *
+   */
+  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+    countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Return approximate number of distinct values for each key this RDD.
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. The default value of
+   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+   * level.
+   */
+  def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+    countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
+  }
+
   /**
    * Merge the values for each key using an associative reduce function. This will also perform
    * the merging locally on each mapper before sending results to a reducer, similarly to a
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea45566ad1da96610f13d3250e6c66a5630040c5..4960e6e82f3781ee14469e2c6c25a93b6ba471c5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.TextOutputFormat
 
 import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
 
 import org.apache.spark.Partitioner._
 import org.apache.spark.api.java.JavaRDD
@@ -41,7 +42,7 @@ import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark._
@@ -788,6 +789,19 @@ abstract class RDD[T: ClassTag](
     sc.runApproximateJob(this, countPartition, evaluator, timeout)
   }
 
+  /**
+   * Return approximate number of distinct elements in the RDD.
+   *
+   * The accuracy of approximation can be controlled through the relative standard deviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. The default value of
+   * relativeSD is 0.05.
+   */
+  def countApproxDistinct(relativeSD: Double = 0.05): Long = {
+    val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+    aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+  }
+
   /**
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 3b56e45aa9586cb0358c04504a40fd59aa59b82f..642dabaad560e1d2a2f68f243e721530d5882efb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{Partition, SparkException, Logging}
+import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
 import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
 
 /**
@@ -85,14 +85,21 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
 
     // Create the output path for the checkpoint
     val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
-    val fs = path.getFileSystem(new Configuration())
+    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
     if (!fs.mkdirs(path)) {
       throw new SparkException("Failed to create checkpoint path " + path)
     }
 
     // Save to file, and reload it as an RDD
-    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+    val broadcastedConf = rdd.context.broadcast(
+      new SerializableWritable(rdd.context.hadoopConfiguration))
+    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
     val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
+    if (newRDD.partitions.size != rdd.partitions.size) {
+      throw new SparkException(
+        "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+          "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
+    }
 
     // Change the dependencies and partitions of the RDD
     RDDCheckpointData.synchronized {
@@ -101,8 +108,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
       rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
       cpState = Checkpointed
       RDDCheckpointData.clearTaskCaches()
-      logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
     }
+    logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
   }
 
   // Get preferred location of a split after checkpointing
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c48a3d64ef72096611550bf2f84983a50967c011..7603eb292f677f841f97498fd15ea39d21a16f87 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -827,7 +827,7 @@ class DAGScheduler(
       }
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
       stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
-      listenerBus.post(StageCompleted(stageToInfos(stage)))
+      listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
       running -= stage
     }
     event.reason match {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index be5c95e59e05bf6a9ab679caf8f2f77e03b90962..f8fa5a9f7a5903384c4514883701d37c82809827 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
    * When stage is completed, record stage completion status
    * @param stageCompleted Stage completed event
    */
-  override def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
         stageCompleted.stage.stageId))
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index ee63b3c4a15a26aad6c061cd65c29779ae7ab00c..627995c826e2b844cdc6a9656ee2d45c51a204df 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
 case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
      extends SparkListenerEvents
 
-case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
 
 case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
 
@@ -47,7 +47,7 @@ trait SparkListener {
   /**
    * Called when a stage is completed, with information on the completed stage
    */
-  def onStageCompleted(stageCompleted: StageCompleted) { }
+  def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
 
   /**
    * Called when a stage is submitted
@@ -86,7 +86,7 @@ trait SparkListener {
  * Simple SparkListener that logs a few summary statistics when each stage completes
  */
 class StatsReportListener extends SparkListener with Logging {
-  override def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     import org.apache.spark.scheduler.StatsReportListener._
     implicit val sc = stageCompleted
     this.logInfo("Finished stage: " + stageCompleted.stage)
@@ -119,13 +119,17 @@ object StatsReportListener extends Logging {
   val probabilities = percentiles.map{_ / 100.0}
   val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
 
-  def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
+  def extractDoubleDistribution(stage: SparkListenerStageCompleted,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+    : Option[Distribution] = {
     Distribution(stage.stage.taskInfos.flatMap {
       case ((info,metric)) => getMetric(info, metric)})
   }
 
   //is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
+  def extractLongDistribution(stage: SparkListenerStageCompleted,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Long])
+    : Option[Distribution] = {
     extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
   }
 
@@ -147,12 +151,12 @@ object StatsReportListener extends Logging {
   }
 
   def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
   }
 
   def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
@@ -169,7 +173,7 @@ object StatsReportListener extends Logging {
   }
 
   def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 85687ea330660533c2fb95c1f5016c3db0ffb152..e7defd768b2c3857e03b8747b6026dfcb9428dd6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
         event match {
           case stageSubmitted: SparkListenerStageSubmitted =>
             sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
-          case stageCompleted: StageCompleted =>
+          case stageCompleted: SparkListenerStageCompleted =>
             sparkListeners.foreach(_.onStageCompleted(stageCompleted))
           case jobStart: SparkListenerJobStart =>
             sparkListeners.foreach(_.onJobStart(jobStart))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2e51dd5a994448a3d61a18dfeeef6a64517e5146..058bc2a2e53a0ce19200192c59861739b55a39d7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
 
   override def onJobStart(jobStart: SparkListenerJobStart) {}
 
-  override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stage
     poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
     activeStages -= stage
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8b4e7c104cb19424f1868c4e48fc5baa3880c34b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{Externalizable, ObjectOutput, ObjectInput}
+import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
+
+/**
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ */
+private[spark]
+class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
+
+  def this() = this(null)  // For deserialization
+
+  def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
+
+  def add[T](elem: T) = {
+    this.value.offer(elem)
+    this
+  }
+
+  def readExternal(in: ObjectInput) {
+    val byteLength = in.readInt()
+    val bytes = new Array[Byte](byteLength)
+    in.readFully(bytes)
+    value = HyperLogLog.Builder.build(bytes)
+  }
+
+  def writeExternal(out: ObjectOutput) {
+    val bytes = value.getBytes()
+    out.writeInt(bytes.length)
+    out.write(bytes)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 79913dc71803c1f2feb648e45b1209e521db385c..23ec6c3b311f0a78e58ed7f2b0a1387418a53f0f 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndComputation() {
     File tempDir = Files.createTempDir();
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
     Assert.assertEquals(false, rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
@@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndRestore() {
     File tempDir = Files.createTempDir();
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-    sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
+    sc.setCheckpointDir(tempDir.getAbsolutePath());
     Assert.assertEquals(false, rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
@@ -930,4 +930,36 @@ public class JavaAPISuite implements Serializable {
                         parts[1]);
   }
 
+  @Test
+  public void countApproxDistinct() {
+    List<Integer> arrayData = new ArrayList<Integer>();
+    int size = 100;
+    for (int i = 0; i < 100000; i++) {
+      arrayData.add(i % size);
+    }
+    JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+  }
+
+  @Test
+  public void countApproxDistinctByKey() {
+    double relativeSD = 0.001;
+
+    List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
+    for (int i = 10; i < 100; i++)
+      for (int j = 0; j < i; j++)
+        arrayData.add(new Tuple2<Integer, Integer>(i, j));
+
+    JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
+    List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(relativeSD).collect();
+    for (Tuple2<Integer, Object> resItem : res) {
+      double count = (double)resItem._1();
+      Long resCount = (Long)resItem._2();
+      Double error = Math.abs((resCount - count) / count);
+      Assert.assertTrue(error < relativeSD);
+    }
+
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 57d3382ed0b3fc30c47335743ecd5cae367ea6f3..5da538a1ddfd5ee05063d6a3e555db1f5305fcb7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashSet
+import scala.util.Random
 
 import org.scalatest.FunSuite
 
@@ -109,6 +110,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
   }
 
+  test("countApproxDistinctByKey") {
+    def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+
+    /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
+     * only a statistical bound, the tests can fail for large values of relativeSD. We will be using
+     * relatively tight error bounds to check correctness of functionality rather than checking
+     * whether the approximation conforms with the requested bound.
+     */
+    val relativeSD = 0.001
+
+    // For each value i, there are i tuples with first element equal to i.
+    // Therefore, the expected count for key i would be i.
+    val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
+    val rdd1 = sc.parallelize(stacked)
+    val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
+    counted1.foreach{
+      case(k, count) => assert(error(count, k) < relativeSD)
+    }
+
+    val rnd = new Random()
+
+    // The expected count for key num would be num
+    val randStacked = (1 to 100).flatMap { i =>
+      val num = rnd.nextInt % 500
+      (1 to num).map(j => (num, j))
+    }
+    val rdd2 = sc.parallelize(randStacked)
+    val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
+    counted2.foreach{
+      case(k, count) => assert(error(count, k) < relativeSD)
+    }
+  }
+
   test("join") {
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index d8dcd6d14c23856e0d79cc519b45e3d14dc84072..1383359f85db0ce77663a8a646c374537e0f3c8a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  test("countApproxDistinct") {
+
+    def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+
+    val size = 100
+    val uniformDistro = for (i <- 1 to 100000) yield i % size
+    val simpleRdd = sc.makeRDD(uniformDistro)
+    assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
+    assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
+    assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
+    assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
+  }
+
   test("SparkContext.union") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 002368ff554f7226cc4f7f6b1643f8c1f61df0c8..d0bd20fc83ea8cadb803ad06e51e707d068bf90a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd)  = onTaskEndCount += 1
       override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
       override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
-      override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
       override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
     }
     sc.addSparkListener(joblogger)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index d4320e5e14458afef262316bdf18d4e75257ac18..1a16e438c43d8e543c701829ae2d8ed2ec17d360 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
   class SaveStageInfo extends SparkListener {
     val stageInfos = Buffer[StageInfo]()
-    override def onStageCompleted(stage: StageCompleted) {
+    override def onStageCompleted(stage: SparkListenerStageCompleted) {
       stageInfos += stage.stage
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c016c5117149fabcf03e71f6fdd755c7bb60b3cd..636e3ab91310321d251786fadae143b385f13b72 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
     assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
   }
 
+  test("kryo with SerializableHyperLogLog") {
+    assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
+  }
+
   test("kryo with reduce") {
     val control = 1 :: 2 :: Nil
     val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
diff --git a/pom.xml b/pom.xml
index 57e843596fe35450f86b471684c4be6e3d42e0f5..3a8eb882ccd21a1462d7a66c39c7796b2471d316 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,11 @@
         <artifactId>asm</artifactId>
         <version>4.0</version>
       </dependency>
+      <dependency>
+        <groupId>com.clearspring.analytics</groupId>
+        <artifactId>stream</artifactId>
+        <version>2.4.0</version>
+      </dependency>
       <!-- In theory we need not directly depend on protobuf since Spark does not directly
            use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump
            the protobuf version up from the one Mesos gives. For now we include this variable 
@@ -282,7 +287,7 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.0.0.CR1</version>
+        <version>4.0.13.Final</version>
       </dependency>
       <dependency>
         <groupId>org.apache.derby</groupId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 7bcbd90bd3f9e38f47443b1ab6985455b0365a9b..b3b5fc788f791628eee2e70a3efe6ed4e0e8f909 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -178,7 +178,7 @@ object SparkBuild extends Build {
 
 
     libraryDependencies ++= Seq(
-        "io.netty"          % "netty-all"       % "4.0.0.CR1",
+        "io.netty"          % "netty-all"       % "4.0.13.Final",
         "org.eclipse.jetty" % "jetty-server"    % "7.6.8.v20121106",
         /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
         "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
@@ -247,7 +247,8 @@ object SparkBuild extends Build {
         "com.codahale.metrics"     % "metrics-ganglia"  % "3.0.0",
         "com.codahale.metrics"     % "metrics-graphite" % "3.0.0",
         "com.twitter"             %% "chill"            % "0.3.1",
-        "com.twitter"              % "chill-java"       % "0.3.1"
+        "com.twitter"              % "chill-java"       % "0.3.1",
+        "com.clearspring.analytics" % "stream"          % "2.5.1"
       )
   )
 
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 0604f6836ca42b3b99cd44a19af0e55fc68415a2..108f36576a0fb39d2c4a503a2ceab8e02cd32187 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -320,17 +320,12 @@ class SparkContext(object):
             self._python_includes.append(filename)
             sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
 
-    def setCheckpointDir(self, dirName, useExisting=False):
+    def setCheckpointDir(self, dirName):
         """
         Set the directory under which RDDs are going to be checkpointed. The
         directory must be a HDFS path if running on a cluster.
-
-        If the directory does not exist, it will be created. If the directory
-        exists and C{useExisting} is set to true, then the exisiting directory
-        will be used.  Otherwise an exception will be thrown to prevent
-        accidental overriding of checkpoint files in the existing directory.
         """
-        self._jsc.sc().setCheckpointDir(dirName, useExisting)
+        self._jsc.sc().setCheckpointDir(dirName)
 
     def _getJavaStorageLevel(self, storageLevel):
         """
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3987642bf4d5c75fb74a3e1071ce7f1882ac905d..7acb6eaf10931f66ec47455283cbbd17f44db480 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase):
         time.sleep(1)  # 1 second
         self.assertTrue(flatMappedRDD.isCheckpointed())
         self.assertEqual(flatMappedRDD.collect(), result)
-        self.assertEqual(self.checkpointDir.name,
-                         os.path.dirname(flatMappedRDD.getCheckpointFile()))
+        self.assertEqual("file:" + self.checkpointDir.name,
+                         os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
 
     def test_checkpoint_and_restore(self):
         parCollection = self.sc.parallelize([1, 2, 3, 4])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7b343d2376addc7171546ebf9c6d9e428bcd9d71..4960a85b972fec8edcd30ef0501da9382c4c9cb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -21,12 +21,13 @@ import java.io._
 import java.util.concurrent.Executors
 import java.util.concurrent.RejectedExecutionException
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.deploy.SparkHadoopUtil
 
 
 private[streaming]
@@ -54,36 +55,34 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
 
 
 /**
- * Convenience class to speed up the writing of graph checkpoint to file
+ * Convenience class to handle the writing of graph checkpoint to file
  */
 private[streaming]
-class CheckpointWriter(checkpointDir: String) extends Logging {
+class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
   val file = new Path(checkpointDir, "graph")
-  // The file to which we actually write - and then "move" to file.
-  private val writeFile = new Path(file.getParent, file.getName + ".next")
-  private val bakFile = new Path(file.getParent, file.getName + ".bk")
-
-  private var stopped = false
-
-  val conf = new Configuration()
-  var fs = file.getFileSystem(conf)
-  val maxAttempts = 3
+  val MAX_ATTEMPTS = 3
   val executor = Executors.newFixedThreadPool(1)
+  val compressionCodec = CompressionCodec.createCodec()
+  // The file to which we actually write - and then "move" to file
+  val writeFile = new Path(file.getParent, file.getName + ".next")
+  // The file to which existing checkpoint is backed up (i.e. "moved")
+  val bakFile = new Path(file.getParent, file.getName + ".bk")
 
-  private val compressionCodec = CompressionCodec.createCodec()
+  private var stopped = false
+  private var fs_ : FileSystem = _
 
-  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since 
+  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
   // I did not notice any errors - reintroduce it ?
-
   class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
     def run() {
       var attempts = 0
       val startTime = System.currentTimeMillis()
-      while (attempts < maxAttempts) {
+      while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1
         try {
           logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
-          // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+          // This is inherently thread unsafe, so alleviating it by writing to '.new' and
+          // then moving it to the final file
           val fos = fs.create(writeFile)
           fos.write(bytes)
           fos.close()
@@ -101,6 +100,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
         } catch {
           case ioe: IOException =>
             logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+            reset()
         }
       }
       logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
@@ -133,7 +133,17 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
     val startTime = System.currentTimeMillis()
     val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
     val endTime = System.currentTimeMillis()
-    logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
+    logInfo("CheckpointWriter executor terminated ? " + terminated +
+      ", waited for " + (endTime - startTime) + " ms.")
+  }
+
+  private def fs = synchronized {
+    if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
+    fs_
+  }
+
+  private def reset() = synchronized {
+    fs_ = null
   }
 }
 
@@ -143,7 +153,8 @@ object CheckpointReader extends Logging {
 
   def read(path: String): Checkpoint = {
     val fs = new Path(path).getFileSystem(new Configuration())
-    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
+      new Path(path), new Path(path + ".bk"))
 
     val compressionCodec = CompressionCodec.createCodec()
 
@@ -158,7 +169,8 @@ object CheckpointReader extends Logging {
           // loader to find and load classes. This is a well know Java issue and has popped up
           // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
           val zis = compressionCodec.compressedInputStream(fis)
-          val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
+          val ois = new ObjectInputStreamWithLoader(zis,
+            Thread.currentThread().getContextClassLoader)
           val cp = ois.readObject.asInstanceOf[Checkpoint]
           ois.close()
           fs.close()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index c759b36f9432687f6b57b1ed2e368e22a730f777..659da2665ac6dfe4cd5676178b37cc8d64051686 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -20,17 +20,19 @@ package org.apache.spark.streaming
 import akka.actor.Props
 import akka.actor.SupervisorStrategy
 import akka.zeromq.Subscribe
+import akka.util.ByteString
 
 import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.StreamingListener
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.receivers.ActorReceiver
 import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
 import org.apache.spark.streaming.receivers.ZeroMQReceiver
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.MetadataCleaner
 import org.apache.spark.streaming.receivers.ActorReceiver
+import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker}
 
 import scala.collection.mutable.Queue
 import scala.collection.Map
@@ -38,17 +40,15 @@ import scala.reflect.ClassTag
 
 import java.io.InputStream
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.UUID
 
 import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
+
 import twitter4j.Status
 import twitter4j.auth.Authorization
-import org.apache.spark.streaming.scheduler._
-import akka.util.ByteString
 
 /**
  * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -87,7 +87,6 @@ class StreamingContext private (
          null, batchDuration)
   }
 
-
   /**
    * Re-create a StreamingContext from a checkpoint file.
    * @param path Path either to the directory that was specified as the checkpoint directory, or
@@ -139,7 +138,7 @@ class StreamingContext private (
 
   protected[streaming] var checkpointDir: String = {
     if (isCheckpointPresent) {
-      sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
+      sc.setCheckpointDir(cp_.checkpointDir)
       cp_.checkpointDir
     } else {
       null
@@ -174,8 +173,12 @@ class StreamingContext private (
    */
   def checkpoint(directory: String) {
     if (directory != null) {
-      sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
-      checkpointDir = directory
+      val path = new Path(directory)
+      val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
+      fs.mkdirs(path)
+      val fullPath = fs.getFileStatus(path).getPath().toString
+      sc.setCheckpointDir(fullPath)
+      checkpointDir = fullPath
     } else {
       checkpointDir = null
     }
@@ -366,7 +369,8 @@ class StreamingContext private (
   /**
    * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them using the given key-value types and input format.
-   * File names starting with . are ignored.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
    * @tparam K Key type for reading HDFS file
    * @tparam V Value type for reading HDFS file
@@ -385,6 +389,8 @@ class StreamingContext private (
   /**
    * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them using the given key-value types and input format.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system.
    * @param directory HDFS directory to monitor for new file
    * @param filter Function to filter paths to process
    * @param newFilesOnly Should process only new files and ignore existing files in the directory
@@ -405,7 +411,9 @@ class StreamingContext private (
   /**
    * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them as text files (using key as LongWritable, value
-   * as Text and input format as TextInputFormat). File names starting with . are ignored.
+   * as Text and input format as TextInputFormat). Files must be written to the
+   * monitored directory by "moving" them from another location within the same
+   * file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
    */
   def textFileStream(directory: String): DStream[String] = {
@@ -597,8 +605,4 @@ object StreamingContext {
       prefix + "-" + time.milliseconds + "." + suffix
     }
   }
-
-  protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
-    new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
-  }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 78d318cf27b1bdc7f4c2dfe342705e6e7e2a1278..aad0d931e7bdec605cab7beaf9c18cd7dec4aee7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -256,9 +256,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates a input stream that monitors a Hadoop-compatible filesystem
+   * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them as text files (using key as LongWritable, value
-   * as Text and input format as TextInputFormat). File names starting with . are ignored.
+   * as Text and input format as TextInputFormat). Files must be written to the
+   * monitored directory by "moving" them from another location within the same
+   * file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
    */
   def textFileStream(directory: String): JavaDStream[String] = {
@@ -300,9 +302,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates a input stream that monitors a Hadoop-compatible filesystem
+   * Create a input stream that monitors a Hadoop-compatible filesystem
    * for new files and reads them using the given key-value types and input format.
-   * File names starting with . are ignored.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
    * @tparam K Key type for reading HDFS file
    * @tparam V Value type for reading HDFS file
@@ -331,7 +334,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
 
 
   /**
-   * Creates a input stream from a Flume source.
+   * Create a input stream from a Flume source.
    * @param hostname Hostname of the slave machine to which the flume data will be sent
    * @param port     Port of the slave machine to which the flume data will be sent
    */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 39e25239bf59cb0eacb13c681df269cb9e79d903..fb9eda899672094c98ca43f86caaea7f38de5a03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
-
+import java.io.{ObjectInputStream, IOException}
+import scala.collection.mutable.{HashSet, HashMap}
+import scala.reflect.ClassTag
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
 
-import scala.collection.mutable.{HashSet, HashMap}
-import scala.reflect.ClassTag
-
-import java.io.{ObjectInputStream, IOException}
 
 private[streaming]
 class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@@ -41,8 +40,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
 
   // Latest file mod time seen till any point of time
-  private val lastModTimeFiles = new HashSet[String]()
-  private var lastModTime = 0L
+  private val prevModTimeFiles = new HashSet[String]()
+  private var prevModTime = 0L
 
   @transient private var path_ : Path = null
   @transient private var fs_ : FileSystem = null
@@ -50,11 +49,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
 
   override def start() {
     if (newFilesOnly) {
-      lastModTime = graph.zeroTime.milliseconds
+      prevModTime = graph.zeroTime.milliseconds
     } else {
-      lastModTime = 0
+      prevModTime = 0
     }
-    logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
+    logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
   }
 
   override def stop() { }
@@ -69,55 +68,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
    * the previous call.
    */
   override def compute(validTime: Time): Option[RDD[(K, V)]] = {
-    assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+    assert(validTime.milliseconds >= prevModTime,
+      "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
 
-    // Create the filter for selecting new files
-    val newFilter = new PathFilter() {
-      // Latest file mod time seen in this round of fetching files and its corresponding files
-      var latestModTime = 0L
-      val latestModTimeFiles = new HashSet[String]()
-
-      def accept(path: Path): Boolean = {
-        if (!filter(path)) {  // Reject file if it does not satisfy filter
-          logDebug("Rejected by filter " + path)
-          return false
-        } else {              // Accept file only if
-          val modTime = fs.getFileStatus(path).getModificationTime()
-          logDebug("Mod time for " + path + " is " + modTime)
-          if (modTime < lastModTime) {
-            logDebug("Mod time less than last mod time")
-            return false  // If the file was created before the last time it was called
-          } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
-            logDebug("Mod time equal to last mod time, but file considered already")
-            return false  // If the file was created exactly as lastModTime but not reported yet
-          } else if (modTime > validTime.milliseconds) {
-            logDebug("Mod time more than valid time")
-            return false  // If the file was created after the time this function call requires
-          }
-          if (modTime > latestModTime) {
-            latestModTime = modTime
-            latestModTimeFiles.clear()
-            logDebug("Latest mod time updated to " + latestModTime)
-          }
-          latestModTimeFiles += path.toString
-          logDebug("Accepted " + path)
-          return true
-        }
-      }
-    }
-    logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
-    val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+    // Find new files
+    val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
     logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
     if (newFiles.length > 0) {
       // Update the modification time and the files processed for that modification time
-      if (lastModTime != newFilter.latestModTime) {
-        lastModTime = newFilter.latestModTime
-        lastModTimeFiles.clear()
+      if (prevModTime < latestModTime) {
+        prevModTime = latestModTime
+        prevModTimeFiles.clear()
       }
-      lastModTimeFiles ++= newFilter.latestModTimeFiles
-      logDebug("Last mod time updated to " + lastModTime)
+      prevModTimeFiles ++= latestModTimeFiles
+      logDebug("Last mod time updated to " + prevModTime)
     }
-    files += ((validTime, newFiles))
+    files += ((validTime, newFiles.toArray))
     Some(filesToRDD(newFiles))
   }
 
@@ -132,12 +98,28 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
       oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
   }
 
+  /**
+   * Find files which have modification timestamp <= current time and return a 3-tuple of
+   * (new files found, latest modification time among them, files with latest modification time)
+   */
+  private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+    logDebug("Trying to get new files for time " + currentTime)
+    val filter = new CustomPathFilter(currentTime)
+    val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+    (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+  }
+
   /** Generate one RDD from an array of files */
-  protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
-    new UnionRDD(
-      context.sparkContext,
-      files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
-    )
+  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+    val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+    files.zip(fileRDDs).foreach { case (file, rdd) => {
+      if (rdd.partitions.size == 0) {
+        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
+          "files that have been \"moved\" to the directory assigned to the file stream. " +
+          "Refer to the streaming programming guide for more details.")
+      }
+    }}
+    new UnionRDD(context.sparkContext, fileRDDs)
   }
 
   private def path: Path = {
@@ -150,6 +132,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     fs_
   }
 
+  private def reset()  {
+    fs_ = null
+  }
+
   @throws(classOf[IOException])
   private def readObject(ois: ObjectInputStream) {
     logDebug(this.getClass().getSimpleName + ".readObject used")
@@ -191,6 +177,51 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
     }
   }
+
+  /**
+   * Custom PathFilter class to find new files that have modification timestamps <= current time,
+   * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
+   */
+  private[streaming]
+  class CustomPathFilter(maxModTime: Long) extends PathFilter {
+    // Latest file mod time seen in this round of fetching files and its corresponding files
+    var latestModTime = 0L
+    val latestModTimeFiles = new HashSet[String]()
+
+    def accept(path: Path): Boolean = {
+      try {
+        if (!filter(path)) {  // Reject file if it does not satisfy filter
+          logDebug("Rejected by filter " + path)
+          return false
+        }
+        val modTime = fs.getFileStatus(path).getModificationTime()
+        logDebug("Mod time for " + path + " is " + modTime)
+        if (modTime < prevModTime) {
+          logDebug("Mod time less than last mod time")
+          return false  // If the file was created before the last time it was called
+        } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
+          logDebug("Mod time equal to last mod time, but file considered already")
+          return false  // If the file was created exactly as lastModTime but not reported yet
+        } else if (modTime > maxModTime) {
+          logDebug("Mod time more than ")
+          return false  // If the file is too new that considering it may give errors
+        }
+        if (modTime > latestModTime) {
+          latestModTime = modTime
+          latestModTimeFiles.clear()
+          logDebug("Latest mod time updated to " + latestModTime)
+        }
+        latestModTimeFiles += path.toString
+        logDebug("Accepted " + path)
+      } catch {
+        case fnfe: java.io.FileNotFoundException => 
+          logWarning("Error finding new files", fnfe)
+          reset()
+          return false
+      }
+      return true
+    }
+  }
 }
 
 private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index afe9316337a8923f137b7b55f7f28fb259e5f855..2eb3b3989e2dba0834528dab31244631890256c1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,11 +17,18 @@
 
 package org.apache.spark.streaming.scheduler
 
+import akka.actor.{Props, Actor}
 import org.apache.spark.SparkEnv
 import org.apache.spark.Logging
 import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
 import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
 
+/** Event classes for JobGenerator */
+private[scheduler] sealed trait JobGeneratorEvent
+private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
+private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent
+private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
+
 /**
  * This class generates jobs from DStreams as well as drives checkpointing and cleaning
  * up DStream metadata.
@@ -30,43 +37,67 @@ private[streaming]
 class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
   val ssc = jobScheduler.ssc
-  val clockClass = System.getProperty(
-    "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
-  val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
-  val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
-    longTime => generateJobs(new Time(longTime)))
   val graph = ssc.graph
+  val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+    def receive = {
+      case event: JobGeneratorEvent =>
+        logDebug("Got event of type " + event.getClass.getName)
+        processEvent(event)
+    }
+  }))
+  val clock = {
+    val clockClass = System.getProperty(
+      "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+    Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+  }
+  val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+    longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
   lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
-    new CheckpointWriter(ssc.checkpointDir)
+    new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
   } else {
     null
   }
 
-  var latestTime: Time = null
-
   def start() = synchronized {
     if (ssc.isCheckpointPresent) {
       restart()
     } else {
       startFirstTime()
     }
-    logInfo("JobGenerator started")
   }
   
-  def stop() = synchronized {
+  def stop() {
     timer.stop()
     if (checkpointWriter != null) checkpointWriter.stop()
     ssc.graph.stop()
     logInfo("JobGenerator stopped")
   }
 
+  /**
+   * On batch completion, clear old metadata and checkpoint computation.
+   */
+  private[scheduler] def onBatchCompletion(time: Time) {
+    eventProcessorActor ! ClearOldMetadata(time)
+  }
+
+  /** Processes all events */
+  private def processEvent(event: JobGeneratorEvent) {
+    event match {
+      case GenerateJobs(time) => generateJobs(time)
+      case ClearOldMetadata(time) => clearOldMetadata(time)
+      case DoCheckpoint(time) => doCheckpoint(time)
+    }
+  }
+
+  /** Starts the generator for the first time */
   private def startFirstTime() {
     val startTime = new Time(timer.getStartTime())
     graph.start(startTime - graph.batchDuration)
     timer.start(startTime.milliseconds)
-    logInfo("JobGenerator's timer started at " + startTime)
+    logInfo("JobGenerator started at " + startTime)
   }
 
+  /** Restarts the generator based on the information in checkpoint */
   private def restart() {
     // If manual clock is being used for testing, then
     // either set the manual clock to the last checkpointed time,
@@ -98,7 +129,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
     // Restart the timer
     timer.start(restartTime.milliseconds)
-    logInfo("JobGenerator's timer restarted at " + restartTime)
+    logInfo("JobGenerator restarted at " + restartTime)
   }
 
   /** Generate jobs and perform checkpoint for the given `time`.  */
@@ -106,16 +137,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     SparkEnv.set(ssc.env)
     logInfo("\n-----------------------------------------------------\n")
     jobScheduler.runJobs(time, graph.generateJobs(time))
-    latestTime = time
-    doCheckpoint(time)
+    eventProcessorActor ! DoCheckpoint(time)
   }
 
-  /**
-   * On batch completion, clear old metadata and checkpoint computation.
-   */
-  private[streaming] def onBatchCompletion(time: Time) {
+  /** Clear DStream metadata for the given `time`. */
+  private def clearOldMetadata(time: Time) {
     ssc.graph.clearOldMetadata(time)
-    doCheckpoint(time)
+    eventProcessorActor ! DoCheckpoint(time)
   }
 
   /** Perform checkpoint for the give `time`. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 67a0841535b0d64bd4f0ec53d0d109667ab08435..4e25c9566c0632ab5f2b4706aa2c5a319cafcd5a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,24 +17,18 @@
 
 package org.apache.spark.streaming
 
-import dstream.FileInputDStream
-import org.apache.spark.streaming.StreamingContext._
 import java.io.File
 
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
-
 import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfter
-
 import com.google.common.io.Files
-
-import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.FileInputDStream
 import org.apache.spark.streaming.util.ManualClock
 
-
-
 /**
  * This test suites tests the checkpointing functionality of DStreams -
  * the checkpointing of a DStream's RDDs as well as the checkpointing of
@@ -66,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
     System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
 
     val stateStreamCheckpointInterval = Seconds(1)
-
+    val fs = FileSystem.getLocal(new Configuration())
     // this ensure checkpointing occurs at least once
     val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
     val secondNumBatches = firstNumBatches
@@ -90,11 +84,12 @@ class CheckpointSuite extends TestSuiteBase {
     ssc.start()
     advanceTimeWithRealDelay(ssc, firstNumBatches)
     logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
-    assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+    assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+      "No checkpointed RDDs in state stream before first failure")
     stateStream.checkpointData.checkpointFiles.foreach {
-      case (time, data) => {
-        val file = new File(data.toString)
-        assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
+      case (time, file) => {
+        assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
+            " for state stream before first failure does not exist")
       }
     }
 
@@ -102,7 +97,8 @@ class CheckpointSuite extends TestSuiteBase {
     // and check whether the earlier checkpoint files are deleted
     val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
     advanceTimeWithRealDelay(ssc, secondNumBatches)
-    checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
+    checkpointFiles.foreach(file =>
+      assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
     ssc.stop()
 
     // Restart stream computation using the checkpoint file and check whether
@@ -110,19 +106,20 @@ class CheckpointSuite extends TestSuiteBase {
     ssc = new StreamingContext(checkpointDir)
     stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
     logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
-    assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
+    assert(!stateStream.generatedRDDs.isEmpty,
+      "No restored RDDs in state stream after recovery from first failure")
 
 
     // Run one batch to generate a new checkpoint file and check whether some RDD
     // is present in the checkpoint data or not
     ssc.start()
     advanceTimeWithRealDelay(ssc, 1)
-    assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+    assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+      "No checkpointed RDDs in state stream before second failure")
     stateStream.checkpointData.checkpointFiles.foreach {
-      case (time, data) => {
-        val file = new File(data.toString)
-        assert(file.exists(),
-          "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
+      case (time, file) => {
+        assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
+          " for state stream before seconds failure does not exist")
       }
     }
     ssc.stop()
@@ -132,7 +129,8 @@ class CheckpointSuite extends TestSuiteBase {
     ssc = new StreamingContext(checkpointDir)
     stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
     logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
-    assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
+    assert(!stateStream.generatedRDDs.isEmpty,
+      "No restored RDDs in state stream after recovery from second failure")
 
     // Adjust manual clock time as if it is being restarted after a delay
     System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
@@ -143,6 +141,7 @@ class CheckpointSuite extends TestSuiteBase {
     ssc = null
   }
 
+
   // This tests whether the systm can recover from a master failure with simple
   // non-stateful operations. This assumes as reliable, replayable input
   // source - TestInputDStream.
@@ -191,6 +190,7 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
+
   // This tests whether file input stream remembers what files were seen before
   // the master failure and uses them again to process a large window operation.
   // It also tests whether batches, whose processing was incomplete due to the
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 62a9f120b46153e713567d50ef777ce692bf0544..5fa14ad7c489a19d5b2ae3cc6726cbcb43ed005f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
 import akka.actor.Props
 import akka.util.ByteString
 
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
 import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
 import java.io.{File, BufferedWriter, OutputStreamWriter}
 import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}