diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b50c9963b9d2c7500e3a093ce3b314cb895c16a0..f8283fbbb980d09e97cd9ea7d83ec912c04f4293 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -310,6 +310,7 @@ abstract class RDD[T: ClassTag](
    * Return a sampled subset of this RDD.
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = {
+    require(fraction >= 0.0, "Invalid fraction value: " + fraction)
     if (withReplacement) {
       new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed)
     } else {
@@ -344,6 +345,10 @@ abstract class RDD[T: ClassTag](
       throw new IllegalArgumentException("Negative number of elements requested")
     }
 
+    if (initialCount == 0) {
+      return new Array[T](0)
+    }
+
     if (initialCount > Integer.MAX_VALUE - 1) {
       maxSelected = Integer.MAX_VALUE - 1
     } else {
@@ -362,7 +367,7 @@ abstract class RDD[T: ClassTag](
     var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
 
     // If the first sample didn't turn out large enough, keep trying to take samples;
-    // this shouldn't happen often because we use a big multiplier for thei initial size
+    // this shouldn't happen often because we use a big multiplier for the initial size
     while (samples.length < total) {
       samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
     }
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 60bcada55245bb6adc38c11dda69b19a751bead2..9512e0e6eeb14f48ee6cc8d3194b74dbfa0fef10 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -457,6 +457,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
 
   test("takeSample") {
     val data = sc.parallelize(1 to 100, 2)
+
     for (seed <- 1 to 5) {
       val sample = data.takeSample(withReplacement=false, 20, seed)
       assert(sample.size === 20)        // Got exactly 20 elements
@@ -488,6 +489,12 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  test("takeSample from an empty rdd") {
+    val emptySet = sc.parallelize(Seq.empty[Int], 2)
+    val sample = emptySet.takeSample(false, 20, 1)
+    assert(sample.length === 0)
+  }
+
   test("randomSplit") {
     val n = 600
     val data = sc.parallelize(1 to n, 2)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6d549b40e5698246766c81d5159353adae137678..f3b432ff248a96b9d5128ca64ce5c1e822b11d37 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -268,6 +268,7 @@ class RDD(object):
         >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
         [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
         """
+        assert fraction >= 0.0, "Invalid fraction value: %s" % fraction
         return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
 
     # this is ported from scala/spark/RDD.scala
@@ -288,6 +289,9 @@ class RDD(object):
         if (num < 0):
             raise ValueError
 
+        if (initialCount == 0):
+            return list()
+
         if initialCount > sys.maxint - 1:
             maxSelected = sys.maxint - 1
         else: