diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index d9f046efce0bf860db8ad8631d51f6529a803845..e2b97b27a6c2cdcef99b400782935b80fb993a30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -482,13 +482,13 @@ case class MapPartitions[T, U](
 }
 
 /** Factory for constructing new `AppendColumn` nodes. */
-object AppendColumn {
+object AppendColumns {
   def apply[T, U : Encoder](
       func: T => U,
       tEncoder: ExpressionEncoder[T],
-      child: LogicalPlan): AppendColumn[T, U] = {
+      child: LogicalPlan): AppendColumns[T, U] = {
     val attrs = encoderFor[U].schema.toAttributes
-    new AppendColumn[T, U](func, tEncoder, encoderFor[U], attrs, child)
+    new AppendColumns[T, U](func, tEncoder, encoderFor[U], attrs, child)
   }
 }
 
@@ -497,7 +497,7 @@ object AppendColumn {
  * resulting columns at the end of the input row. tEncoder/uEncoder are used respectively to
  * decode/encode from the JVM object representation expected by `func.`
  */
-case class AppendColumn[T, U](
+case class AppendColumns[T, U](
     func: T => U,
     tEncoder: ExpressionEncoder[T],
     uEncoder: ExpressionEncoder[U],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 929224460dc099e8957d05387bd4c2ec8515301e..82e9cd7f50a31d91bfb19828f0140db35f995c3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -58,10 +58,11 @@ class TypedColumn[-T, U](
   private[sql] def withInputType(
       inputEncoder: ExpressionEncoder[_],
       schema: Seq[Attribute]): TypedColumn[T, U] = {
+    val boundEncoder = inputEncoder.bind(schema).asInstanceOf[ExpressionEncoder[Any]]
     new TypedColumn[T, U] (expr transform {
       case ta: TypedAggregateExpression if ta.aEncoder.isEmpty =>
         ta.copy(
-          aEncoder = Some(inputEncoder.asInstanceOf[ExpressionEncoder[Any]]),
+          aEncoder = Some(boundEncoder),
           children = schema)
     }, encoder)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b930e4661c1a64e3f4fa0122ceda6d112ba21aca..4cc3aa2465f2e37186825d72a854fada8399245d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -299,7 +299,7 @@ class Dataset[T] private[sql](
    */
   def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
     val inputPlan = queryExecution.analyzed
-    val withGroupingKey = AppendColumn(func, resolvedTEncoder, inputPlan)
+    val withGroupingKey = AppendColumns(func, resolvedTEncoder, inputPlan)
     val executed = sqlContext.executePlan(withGroupingKey)
 
     new GroupedDataset(
@@ -364,13 +364,11 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = {
-    // We use an unbound encoder since the expression will make up its own schema.
-    // TODO: This probably doesn't work if we are relying on reordering of the input class fields.
     new Dataset[U1](
       sqlContext,
       Project(
         c1.withInputType(
-          resolvedTEncoder.bind(queryExecution.analyzed.output),
+          resolvedTEncoder,
           queryExecution.analyzed.output).named :: Nil,
         logicalPlan))
   }
@@ -382,10 +380,8 @@ class Dataset[T] private[sql](
    */
   protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
     val encoders = columns.map(_.encoder)
-    // We use an unbound encoder since the expression will make up its own schema.
-    // TODO: This probably doesn't work if we are relying on reordering of the input class fields.
     val namedColumns =
-      columns.map(_.withInputType(unresolvedTEncoder, queryExecution.analyzed.output).named)
+      columns.map(_.withInputType(resolvedTEncoder, queryExecution.analyzed.output).named)
     val execution = new QueryExecution(sqlContext, Project(namedColumns, logicalPlan))
 
     new Dataset(sqlContext, execution, ExpressionEncoder.tuple(encoders))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index ae1272ae531fb336b2c706af141bed5f0d52d150..9c16940707de97c030a4ff7fd82f8f8e004e787f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -89,7 +89,7 @@ class GroupedDataset[K, T] private[sql](
   }
 
   /**
-   * Applies the given function to each group of data.  For each unique group, the function  will
+   * Applies the given function to each group of data.  For each unique group, the function will
    * be passed the group key and an iterator that contains all of the elements in the group. The
    * function can return an iterator containing elements of an arbitrary type which will be returned
    * as a new [[Dataset]].
@@ -162,7 +162,7 @@ class GroupedDataset[K, T] private[sql](
     val encoders = columns.map(_.encoder)
     val namedColumns =
       columns.map(
-        _.withInputType(resolvedTEncoder.bind(dataAttributes), dataAttributes).named)
+        _.withInputType(resolvedTEncoder, dataAttributes).named)
     val aggregate = Aggregate(groupingAttributes, groupingAttributes ++ namedColumns, logicalPlan)
     val execution = new QueryExecution(sqlContext, aggregate)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index a99ae4674bb125409c514b296ff69046bd11f674..67201a2c191cd8489193fed8d70faef802657335 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -321,7 +321,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
       case logical.MapPartitions(f, tEnc, uEnc, output, child) =>
         execution.MapPartitions(f, tEnc, uEnc, output, planLater(child)) :: Nil
-      case logical.AppendColumn(f, tEnc, uEnc, newCol, child) =>
+      case logical.AppendColumns(f, tEnc, uEnc, newCol, child) =>
         execution.AppendColumns(f, tEnc, uEnc, newCol, planLater(child)) :: Nil
       case logical.MapGroups(f, kEnc, tEnc, uEnc, grouping, output, child) =>
         execution.MapGroups(f, kEnc, tEnc, uEnc, grouping, output, planLater(child)) :: Nil
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 46169ca07d715ef2d4b10254aee8bacd6f68a256..eb6fa1e72e27b095d5c62ad1955aef9ebb65f3b4 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -157,6 +157,7 @@ public class JavaDatasetSuite implements Serializable {
     Assert.assertEquals(6, reduced);
   }
 
+  @Test
   public void testGroupBy() {
     List<String> data = Arrays.asList("a", "foo", "bar");
     Dataset<String> ds = context.createDataset(data, Encoders.STRING());
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 20896efdfec1615e66ea718cea2818f134c789d0..46f9f077fe7f21b9c762919a7a43221cdf78760e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -161,6 +161,10 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
       ds.select(ClassInputAgg.toColumn),
       1)
 
+    checkAnswer(
+      ds.select(expr("avg(a)").as[Double], ClassInputAgg.toColumn),
+      (1.0, 1))
+
     checkAnswer(
       ds.groupBy(_.b).agg(ClassInputAgg.toColumn),
       ("one", 1))