diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index bbc94a7ab3398a229b5c6a3b64fefbcf93f3c775..608e272da778446c4e8c12527b2b7de2121305ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -310,6 +310,17 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 
+/**
+ * Return a new RDD that has exactly `numPartitions` partitions. Differs from
+ * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
+ * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
+ * of the output requires some specific ordering or distribution of the data.
+ */
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
+  extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
 /**
  * A relation with one row. This is used in "SELECT ..." without a from clause.
  */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
index e737418d9c3bc821cce17bdb87de2e61193d91eb..63df2c1ee72ffbafdb63109ddadfe4ea1643b273 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
@@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode {
 case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
   extends RedistributeData
 
-case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
+/**
+ * This method repartitions data using [[Expression]]s, and receives information about the
+ * number of partitions during execution. Used when a specific ordering or distribution is
+ * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
+ * `coalesce` and `repartition`.
+ */
+case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
   extends RedistributeData
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index ca6ae482eb2abe40b3b9645c9e659b0bfb7b4ffe..2affba7d42cc80fa2eec369af73be9a3bc40eef6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -961,9 +961,7 @@ class DataFrame private[sql](
    * @group rdd
    */
   override def repartition(numPartitions: Int): DataFrame = {
-    sqlContext.createDataFrame(
-      queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
-      schema, needsConversion = false)
+    Repartition(numPartitions, shuffle = true, logicalPlan)
   }
 
   /**
@@ -974,10 +972,7 @@ class DataFrame private[sql](
    * @group rdd
    */
   override def coalesce(numPartitions: Int): DataFrame = {
-    sqlContext.createDataFrame(
-      queryExecution.toRdd.coalesce(numPartitions),
-      schema,
-      needsConversion = false)
+    Repartition(numPartitions, shuffle = false, logicalPlan)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 030ef118f75d4ab0f199d7ef7a80caa403eee9a0..3a0a6c86700a8bfb644ba9b95c92107e7c15e509 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.Distinct(child) =>
         execution.Distinct(partial = false,
           execution.Distinct(partial = true, planLater(child))) :: Nil
-
+      case logical.Repartition(numPartitions, shuffle, child) =>
+        execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
@@ -317,7 +318,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
       case logical.OneRowRelation =>
         execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
-      case logical.Repartition(expressions, child) =>
+      case logical.RepartitionByExpression(expressions, child) =>
         execution.Exchange(
           HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
       case e @ EvaluatePython(udf, child, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index d286fe81bee5fee38f1fc4cc51415df711802a64..1afdb409417ce726b69403024958f111d94d0970 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -245,6 +245,20 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Return a new RDD that has exactly `numPartitions` partitions.
+ */
+@DeveloperApi
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
+  extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def execute(): RDD[Row] = {
+    child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
+  }
+}
+
 
 /**
  * :: DeveloperApi ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0ea6d57b816c66b6a5353a59985090f07e82711b..2dc6463abafa76a1639364a0e8a12165589dc45a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             case (None, Some(perPartitionOrdering), None, None) =>
               Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
             case (None, None, Some(partitionExprs), None) =>
-              Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
+              RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
             case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
               Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
-                Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
+                RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, Some(clusterExprs)) =>
               Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
-                Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
+                RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, None) => withHaving
             case _ => sys.error("Unsupported set of ordering / distribution clauses.")
           }