diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7080074a69c07e7cd79d39f3e82b14e5563b0f02..c078e71fe0290fa9ee78e1881fba56a2f842df64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -247,8 +247,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.Distinct(child) =>
-        execution.Aggregate(
-          partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
+        execution.Distinct(partial = false,
+          execution.Distinct(partial = true, planLater(child))) :: Nil
       case logical.Sort(sortExprs, child) =>
         // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
         execution.Sort(sortExprs, global = true, planLater(child)):: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 97abd636ab5fb876271125b7f696c7403225f9d3..966d8f95fc83c78fa0fa86b66b3c2176aa2c3b3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
 
 /**
@@ -248,6 +248,37 @@ object ExistingRdd {
 case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
   override def execute() = rdd
 }
+/**
+ * :: DeveloperApi ::
+ * Computes the set of distinct input rows using a HashSet.
+ * @param partial when true the distinct operation is performed partially, per partition, without
+ *                shuffling the data.
+ * @param child the input query plan.
+ */
+@DeveloperApi
+case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
+  override def output = child.output
+
+  override def requiredChildDistribution =
+    if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
+
+  override def execute() = {
+    child.execute().mapPartitions { iter =>
+      val hashSet = new scala.collection.mutable.HashSet[Row]()
+
+      var currentRow: Row = null
+      while (iter.hasNext) {
+        currentRow = iter.next()
+        if (!hashSet.contains(currentRow)) {
+          hashSet.add(currentRow.copy())
+        }
+      }
+
+      hashSet.iterator
+    }
+  }
+}
+
 
 /**
  * :: DeveloperApi ::