diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index f840475ffaf708be3fc80903297538aff9c7ef51..e7c35ac1ffe025b2e61bd616ae8f306f8e8fdf1a 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -28,9 +28,13 @@ from array import array
 from operator import itemgetter
 
 from pyspark.rdd import RDD, PipelinedRDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+
+from itertools import chain, ifilter, imap
 
 from py4j.protocol import Py4JError
+from py4j.java_collections import ListConverter, MapConverter
+
 
 __all__ = [
     "StringType", "BinaryType", "BooleanType", "TimestampType", "DecimalType",
@@ -932,6 +936,39 @@ class SQLContext:
             self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
         return self._scala_SQLContext
 
+    def registerFunction(self, name, f, returnType=StringType()):
+        """Registers a lambda function as a UDF so it can be used in SQL statements.
+
+        In addition to a name and the function itself, the return type can be optionally specified.
+        When the return type is not given it default to a string and conversion will automatically
+        be done.  For any other return type, the produced object must match the specified type.
+
+        >>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
+        >>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
+        [Row(c0=u'4')]
+        >>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
+        >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
+        [Row(c0=4)]
+        >>> sqlCtx.registerFunction("twoArgs", lambda x, y: len(x) + y, IntegerType())
+        >>> sqlCtx.sql("SELECT twoArgs('test', 1)").collect()
+        [Row(c0=5)]
+        """
+        func = lambda _, it: imap(lambda x: f(*x), it)
+        command = (func,
+                   BatchedSerializer(PickleSerializer(), 1024),
+                   BatchedSerializer(PickleSerializer(), 1024))
+        env = MapConverter().convert(self._sc.environment,
+                                     self._sc._gateway._gateway_client)
+        includes = ListConverter().convert(self._sc._python_includes,
+                                     self._sc._gateway._gateway_client)
+        self._ssql_ctx.registerPython(name,
+                                      bytearray(CloudPickleSerializer().dumps(command)),
+                                      env,
+                                      includes,
+                                      self._sc.pythonExec,
+                                      self._sc._javaAccumulator,
+                                      str(returnType))
+
     def inferSchema(self, rdd):
         """Infer and apply a schema to an RDD of L{Row}s.
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index c0255701b7ba593678670071483376fb2a9e31a9..760c49fbca4a522ecefe6f798f69ee4f4447d374 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -18,17 +18,49 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.expressions.Expression
+import scala.collection.mutable
 
 /** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
 trait FunctionRegistry {
+  type FunctionBuilder = Seq[Expression] => Expression
+
+  def registerFunction(name: String, builder: FunctionBuilder): Unit
+
   def lookupFunction(name: String, children: Seq[Expression]): Expression
 }
 
+trait OverrideFunctionRegistry extends FunctionRegistry {
+
+  val functionBuilders = new mutable.HashMap[String, FunctionBuilder]()
+
+  def registerFunction(name: String, builder: FunctionBuilder) = {
+    functionBuilders.put(name, builder)
+  }
+
+  abstract override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    functionBuilders.get(name).map(_(children)).getOrElse(super.lookupFunction(name,children))
+  }
+}
+
+class SimpleFunctionRegistry extends FunctionRegistry {
+  val functionBuilders = new mutable.HashMap[String, FunctionBuilder]()
+
+  def registerFunction(name: String, builder: FunctionBuilder) = {
+    functionBuilders.put(name, builder)
+  }
+
+  override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    functionBuilders(name)(children)
+  }
+}
+
 /**
  * A trivial catalog that returns an error when a function is requested.  Used for testing when all
  * functions are already filled in and the analyser needs only to resolve attribute references.
  */
 object EmptyFunctionRegistry extends FunctionRegistry {
+  def registerFunction(name: String, builder: FunctionBuilder) = ???
+
   def lookupFunction(name: String, children: Seq[Expression]): Expression = {
     throw new UnsupportedOperationException
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index acddf5e9c70042dd58eaaaef0f88923c0d81cc19..95633dd0c98707db02aec08239c044ed2b8ca152 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -27,6 +27,22 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
   def references = children.flatMap(_.references).toSet
   def nullable = true
 
+  /** This method has been generated by this script
+
+    (1 to 22).map { x =>
+      val anys = (1 to x).map(x => "Any").reduce(_ + ", " + _)
+      val evals = (0 to x - 1).map(x => s"children($x).eval(input)").reduce(_ + ",\n    " + _)
+
+    s"""
+    case $x =>
+      function.asInstanceOf[($anys) => Any](
+      $evals)
+    """
+    }
+
+  */
+
+  // scalastyle:off
   override def eval(input: Row): Any = {
     children.size match {
       case 0 => function.asInstanceOf[() => Any]()
@@ -35,6 +51,297 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
         function.asInstanceOf[(Any, Any) => Any](
           children(0).eval(input),
           children(1).eval(input))
+      case 3 =>
+        function.asInstanceOf[(Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input))
+      case 4 =>
+        function.asInstanceOf[(Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input))
+      case 5 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input))
+      case 6 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input))
+      case 7 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input))
+      case 8 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input))
+      case 9 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input))
+      case 10 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input))
+      case 11 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input))
+      case 12 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input))
+      case 13 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input))
+      case 14 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input))
+      case 15 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input))
+      case 16 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input))
+      case 17 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input),
+          children(16).eval(input))
+      case 18 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input),
+          children(16).eval(input),
+          children(17).eval(input))
+      case 19 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input),
+          children(16).eval(input),
+          children(17).eval(input),
+          children(18).eval(input))
+      case 20 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input),
+          children(16).eval(input),
+          children(17).eval(input),
+          children(18).eval(input),
+          children(19).eval(input))
+      case 21 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input),
+          children(16).eval(input),
+          children(17).eval(input),
+          children(18).eval(input),
+          children(19).eval(input),
+          children(20).eval(input))
+      case 22 =>
+        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
+          children(0).eval(input),
+          children(1).eval(input),
+          children(2).eval(input),
+          children(3).eval(input),
+          children(4).eval(input),
+          children(5).eval(input),
+          children(6).eval(input),
+          children(7).eval(input),
+          children(8).eval(input),
+          children(9).eval(input),
+          children(10).eval(input),
+          children(11).eval(input),
+          children(12).eval(input),
+          children(13).eval(input),
+          children(14).eval(input),
+          children(15).eval(input),
+          children(16).eval(input),
+          children(17).eval(input),
+          children(18).eval(input),
+          children(19).eval(input),
+          children(20).eval(input),
+          children(21).eval(input))
     }
+    // scalastyle:on
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java
new file mode 100644
index 0000000000000000000000000000000000000000..ef959e35e102754f2cae2caea0ff4ecfae23978c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF1.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 1 arguments.
+ */
+public interface UDF1<T1, R> extends Serializable {
+  public R call(T1 t1) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java
new file mode 100644
index 0000000000000000000000000000000000000000..96ab3a96c3d5e3af65309332c908240fc8a6d956
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF10.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 10 arguments.
+ */
+public interface UDF10<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java
new file mode 100644
index 0000000000000000000000000000000000000000..58ae8edd6d817abe715014009284ccaa4c5b84d9
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF11.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 11 arguments.
+ */
+public interface UDF11<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java
new file mode 100644
index 0000000000000000000000000000000000000000..d9da0f6eddd942965ad7fe31caf4ee6f8e22469d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF12.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 12 arguments.
+ */
+public interface UDF12<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java
new file mode 100644
index 0000000000000000000000000000000000000000..095fc1a8076b5dbb5d5e643342a811cffc37bef8
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF13.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 13 arguments.
+ */
+public interface UDF13<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java
new file mode 100644
index 0000000000000000000000000000000000000000..eb27eaa1800864f1709d7df1e86ae9640c219188
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF14.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 14 arguments.
+ */
+public interface UDF14<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java
new file mode 100644
index 0000000000000000000000000000000000000000..1fbcff56332b63ba367210b2404af90c0946e015
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF15.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 15 arguments.
+ */
+public interface UDF15<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java
new file mode 100644
index 0000000000000000000000000000000000000000..1133561787a69bed1ffaf91954bced5f31d99da1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF16.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 16 arguments.
+ */
+public interface UDF16<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java
new file mode 100644
index 0000000000000000000000000000000000000000..dfae7922c9b63fefec0e147c4dfd12ffd45bd413
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF17.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 17 arguments.
+ */
+public interface UDF17<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java
new file mode 100644
index 0000000000000000000000000000000000000000..e9d1c6d52d4ea2d58732b5cd09ac7fa789b04aa7
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF18.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 18 arguments.
+ */
+public interface UDF18<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java
new file mode 100644
index 0000000000000000000000000000000000000000..46b9d2d3c945796f4032700b964b243b36cace67
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF19.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 19 arguments.
+ */
+public interface UDF19<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java
new file mode 100644
index 0000000000000000000000000000000000000000..cd3fde8da419e6c8ab4bc92f042099a77c4027d5
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF2.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 2 arguments.
+ */
+public interface UDF2<T1, T2, R> extends Serializable {
+  public R call(T1 t1, T2 t2) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java
new file mode 100644
index 0000000000000000000000000000000000000000..113d3d26be4a71b0d2148c706de4ed8dd7fb5991
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF20.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 20 arguments.
+ */
+public interface UDF20<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19, T20 t20) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java
new file mode 100644
index 0000000000000000000000000000000000000000..74118f2cf8da761b20e0bc703d66a13322f009bc
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF21.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 21 arguments.
+ */
+public interface UDF21<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19, T20 t20, T21 t21) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java
new file mode 100644
index 0000000000000000000000000000000000000000..0e7cc40be45ec85c3a87e94395d9518ab673874c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF22.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 22 arguments.
+ */
+public interface UDF22<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, T19 t19, T20 t20, T21 t21, T22 t22) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java
new file mode 100644
index 0000000000000000000000000000000000000000..6a880f16be47a3a78098070f5ed65de55f5ad4d5
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF3.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 3 arguments.
+ */
+public interface UDF3<T1, T2, T3, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java
new file mode 100644
index 0000000000000000000000000000000000000000..fcad2febb18e6e021810e69d3617e3bbaa00d7e2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF4.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 4 arguments.
+ */
+public interface UDF4<T1, T2, T3, T4, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java
new file mode 100644
index 0000000000000000000000000000000000000000..ce0cef43a214425aa800e0db170ed3b020c866f7
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF5.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 5 arguments.
+ */
+public interface UDF5<T1, T2, T3, T4, T5, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java
new file mode 100644
index 0000000000000000000000000000000000000000..f56b806684e61ea110c65a927eac3985c6d444f9
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF6.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 6 arguments.
+ */
+public interface UDF6<T1, T2, T3, T4, T5, T6, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java
new file mode 100644
index 0000000000000000000000000000000000000000..25bd6d3241bd4ff15cb545f1220879daf5d312ac
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF7.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 7 arguments.
+ */
+public interface UDF7<T1, T2, T3, T4, T5, T6, T7, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java
new file mode 100644
index 0000000000000000000000000000000000000000..a3b7ac5f94ce7b65dfc7eeeccca50318263299bd
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF8.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 8 arguments.
+ */
+public interface UDF8<T1, T2, T3, T4, T5, T6, T7, T8, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8) throws Exception;
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java
new file mode 100644
index 0000000000000000000000000000000000000000..205e72a1522fccc937814bbcc9e19acf14126a64
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/UDF9.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+// **************************************************
+// THIS FILE IS AUTOGENERATED BY CODE IN
+// org.apache.spark.sql.api.java.FunctionRegistration
+// **************************************************
+
+/**
+ * A Spark SQL UDF that has 9 arguments.
+ */
+public interface UDF9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> extends Serializable {
+  public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9) throws Exception;
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 00dd34aabc3890b0cc3d61c8231dd16590ce54c4..33931e5d996f5e0ed0ff629f5e5cd1915493a760 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -48,18 +48,23 @@ import org.apache.spark.{Logging, SparkContext}
  */
 @AlphaComponent
 class SQLContext(@transient val sparkContext: SparkContext)
-  extends Logging
+  extends org.apache.spark.Logging
   with SQLConf
   with ExpressionConversions
+  with UDFRegistration
   with Serializable {
 
   self =>
 
   @transient
   protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
+
+  @transient
+  protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry
+
   @transient
   protected[sql] lazy val analyzer: Analyzer =
-    new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)
+    new Analyzer(catalog, functionRegistry, caseSensitive = true)
   @transient
   protected[sql] val optimizer = Optimizer
   @transient
@@ -379,7 +384,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected abstract class QueryExecution {
     def logical: LogicalPlan
 
-    lazy val analyzed = analyzer(logical)
+    lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
     lazy val optimizedPlan = optimizer(analyzed)
     // TODO: Don't just pick the first one...
     lazy val sparkPlan = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0b48e9e659faa2a835265c8e72c6e905bcc90564
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.{List => JList, Map => JMap}
+
+import org.apache.spark.Accumulator
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf}
+import org.apache.spark.sql.execution.PythonUDF
+
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
+
+/**
+ * Functions for registering scala lambda functions as UDFs in a SQLContext.
+ */
+protected[sql] trait UDFRegistration {
+  self: SQLContext =>
+
+  private[spark] def registerPython(
+      name: String,
+      command: Array[Byte],
+      envVars: JMap[String, String],
+      pythonIncludes: JList[String],
+      pythonExec: String,
+      accumulator: Accumulator[JList[Array[Byte]]],
+      stringDataType: String): Unit = {
+    log.debug(
+      s"""
+        | Registering new PythonUDF:
+        | name: $name
+        | command: ${command.toSeq}
+        | envVars: $envVars
+        | pythonIncludes: $pythonIncludes
+        | pythonExec: $pythonExec
+        | dataType: $stringDataType
+      """.stripMargin)
+
+
+    val dataType = parseDataType(stringDataType)
+
+    def builder(e: Seq[Expression]) =
+      PythonUDF(
+        name,
+        command,
+        envVars,
+        pythonIncludes,
+        pythonExec,
+        accumulator,
+        dataType,
+        e)
+
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  /** registerFunction 1-22 were generated by this script
+
+    (1 to 22).map { x =>
+      val types = (1 to x).map(x => "_").reduce(_ + ", " + _)
+      s"""
+        def registerFunction[T: TypeTag](name: String, func: Function$x[$types, T]): Unit = {
+          def builder(e: Seq[Expression]) =
+            ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+          functionRegistry.registerFunction(name, builder)
+        }
+      """
+    }
+  */
+
+  // scalastyle:off
+  def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function2[_, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function3[_, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function4[_, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function5[_, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function6[_, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function7[_, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function8[_, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function9[_, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function10[_, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+
+  def registerFunction[T: TypeTag](name: String, func: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = {
+    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)
+    functionRegistry.registerFunction(name, builder)
+  }
+  // scalastyle:on
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 809dd038f94aa25324d4ae599cbb0951d7c4fc89..ae45193ed15d3bcbe6b3806fdb37a879ba319d29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -28,14 +28,13 @@ import org.apache.spark.sql.{SQLContext, StructType => SStructType}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
 import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
-import org.apache.spark.sql.types.util.DataTypeConversions
-import DataTypeConversions.asScalaDataType;
+import org.apache.spark.sql.types.util.DataTypeConversions.asScalaDataType
 import org.apache.spark.util.Utils
 
 /**
  * The entry point for executing Spark SQL queries from a Java program.
  */
-class JavaSQLContext(val sqlContext: SQLContext) {
+class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
 
   def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
new file mode 100644
index 0000000000000000000000000000000000000000..158f26e3d445fd71bd2944f71f2669887dc4634d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/UDFRegistration.scala
@@ -0,0 +1,252 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.api.java
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf}
+import org.apache.spark.sql.types.util.DataTypeConversions._
+
+/**
+ * A collection of functions that allow Java users to register UDFs.  In order to handle functions
+ * of varying airities with minimal boilerplate for our users, we generate classes and functions
+ * for each airity up to 22.  The code for this generation can be found in comments in this trait.
+ */
+private[java] trait UDFRegistration {
+  self: JavaSQLContext =>
+
+  /* The following functions and required interfaces are generated with these code fragments:
+
+   (1 to 22).foreach { i =>
+     val extTypeArgs = (1 to i).map(_ => "_").mkString(", ")
+     val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ")
+     val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]"
+     val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
+     println(s"""
+         |def registerFunction(
+         |    name: String, f: UDF$i[$extTypeArgs, _], @transient dataType: DataType) = {
+         |  val scalaType = asScalaDataType(dataType)
+         |  sqlContext.functionRegistry.registerFunction(
+         |    name,
+         |    (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), scalaType, e))
+         |}
+       """.stripMargin)
+   }
+
+  import java.io.File
+  import org.apache.spark.sql.catalyst.util.stringToFile
+  val directory = new File("sql/core/src/main/java/org/apache/spark/sql/api/java/")
+  (1 to 22).foreach { i =>
+    val typeArgs = (1 to i).map(i => s"T$i").mkString(", ")
+    val args = (1 to i).map(i => s"T$i t$i").mkString(", ")
+
+    val contents =
+      s"""/*
+         | * Licensed to the Apache Software Foundation (ASF) under one or more
+         | * contributor license agreements.  See the NOTICE file distributed with
+         | * this work for additional information regarding copyright ownership.
+         | * The ASF licenses this file to You under the Apache License, Version 2.0
+         | * (the "License"); you may not use this file except in compliance with
+         | * the License.  You may obtain a copy of the License at
+         | *
+         | *    http://www.apache.org/licenses/LICENSE-2.0
+         | *
+         | * Unless required by applicable law or agreed to in writing, software
+         | * distributed under the License is distributed on an "AS IS" BASIS,
+         | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+         | * See the License for the specific language governing permissions and
+         | * limitations under the License.
+         | */
+         |
+         |package org.apache.spark.sql.api.java;
+         |
+         |import java.io.Serializable;
+         |
+         |// **************************************************
+         |// THIS FILE IS AUTOGENERATED BY CODE IN
+         |// org.apache.spark.sql.api.java.FunctionRegistration
+         |// **************************************************
+         |
+         |/**
+         | * A Spark SQL UDF that has $i arguments.
+         | */
+         |public interface UDF$i<$typeArgs, R> extends Serializable {
+         |  public R call($args) throws Exception;
+         |}
+         |""".stripMargin
+
+      stringToFile(new File(directory, s"UDF$i.java"), contents)
+  }
+
+  */
+
+  // scalastyle:off
+  def registerFunction(name: String, f: UDF1[_, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF2[_, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF3[_, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF4[_, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF5[_, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF6[_, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF7[_, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  def registerFunction(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], dataType: DataType) = {
+    val scalaType = asScalaDataType(dataType)
+    sqlContext.functionRegistry.registerFunction(
+      name,
+      (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), scalaType, e))
+  }
+
+  // scalastyle:on
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 8bec015c7b4652b2be8932eb628786fb96a8bedc..f0c958fdb537fd867ea15230ccef8c9709ba7255 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -286,6 +286,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.ExistingRdd(Nil, singleRowRdd) :: Nil
       case logical.Repartition(expressions, child) =>
         execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
+      case e @ EvaluatePython(udf, child) =>
+        BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
       case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
       case _ => Nil
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b92091b560b1c4166751b5ee7417029ec5d35832
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -0,0 +1,177 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution
+
+import java.util.{List => JList, Map => JMap}
+
+import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.{Accumulator, Logging => SparkLogging}
+
+import scala.collection.JavaConversions._
+
+/**
+ * A serialized version of a Python lambda function.  Suitable for use in a [[PythonRDD]].
+ */
+private[spark] case class PythonUDF(
+    name: String,
+    command: Array[Byte],
+    envVars: JMap[String, String],
+    pythonIncludes: JList[String],
+    pythonExec: String,
+    accumulator: Accumulator[JList[Array[Byte]]],
+    dataType: DataType,
+    children: Seq[Expression]) extends Expression with SparkLogging {
+
+  override def toString = s"PythonUDF#$name(${children.mkString(",")})"
+
+  def nullable: Boolean = true
+  def references: Set[Attribute] = children.flatMap(_.references).toSet
+
+  override def eval(input: Row) = sys.error("PythonUDFs can not be directly evaluated.")
+}
+
+/**
+ * Extracts PythonUDFs from operators, rewriting the query plan so that the UDF can be evaluated
+ * alone in a batch.
+ *
+ * This has the limitation that the input to the Python UDF is not allowed include attributes from
+ * multiple child operators.
+ */
+private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan) = plan transform {
+    // Skip EvaluatePython nodes.
+    case p: EvaluatePython => p
+
+    case l: LogicalPlan =>
+      // Extract any PythonUDFs from the current operator.
+      val udfs = l.expressions.flatMap(_.collect { case udf: PythonUDF => udf})
+      if (udfs.isEmpty) {
+        // If there aren't any, we are done.
+        l
+      } else {
+        // Pick the UDF we are going to evaluate (TODO: Support evaluating multiple UDFs at a time)
+        // If there is more than one, we will add another evaluation operator in a subsequent pass.
+        val udf = udfs.head
+
+        var evaluation: EvaluatePython = null
+
+        // Rewrite the child that has the input required for the UDF
+        val newChildren = l.children.map { child =>
+          // Check to make sure that the UDF can be evaluated with only the input of this child.
+          // Other cases are disallowed as they are ambiguous or would require a cartisian product.
+          if (udf.references.subsetOf(child.outputSet)) {
+            evaluation = EvaluatePython(udf, child)
+            evaluation
+          } else if (udf.references.intersect(child.outputSet).nonEmpty) {
+            sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")
+          } else {
+            child
+          }
+        }
+
+        assert(evaluation != null, "Unable to evaluate PythonUDF.  Missing input attributes.")
+
+        // Trim away the new UDF value if it was only used for filtering or something.
+        logical.Project(
+          l.output,
+          l.transformExpressions {
+            case p: PythonUDF if p.id == udf.id => evaluation.resultAttribute
+          }.withNewChildren(newChildren))
+      }
+  }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Evaluates a [[PythonUDF]], appending the result to the end of the input tuple.
+ */
+@DeveloperApi
+case class EvaluatePython(udf: PythonUDF, child: LogicalPlan) extends logical.UnaryNode {
+  val resultAttribute = AttributeReference("pythonUDF", udf.dataType, nullable=true)()
+
+  def references = Set.empty
+  def output = child.output :+ resultAttribute
+}
+
+/**
+ * :: DeveloperApi ::
+ * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time.  The input
+ * data is cached and zipped with the result of the udf evaluation.
+ */
+@DeveloperApi
+case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan)
+  extends SparkPlan {
+  def children = child :: Nil
+
+  def execute() = {
+    // TODO: Clean up after ourselves?
+    val childResults = child.execute().map(_.copy()).cache()
+
+    val parent = childResults.mapPartitions { iter =>
+      val pickle = new Pickler
+      val currentRow = newMutableProjection(udf.children, child.output)()
+      iter.grouped(1000).map { inputRows =>
+        val toBePickled = inputRows.map(currentRow(_).toArray).toArray
+        pickle.dumps(toBePickled)
+      }
+    }
+
+    val pyRDD = new PythonRDD(
+      parent,
+      udf.command,
+      udf.envVars,
+      udf.pythonIncludes,
+      false,
+      udf.pythonExec,
+      Seq[Broadcast[Array[Byte]]](),
+      udf.accumulator
+    ).mapPartitions { iter =>
+      val pickle = new Unpickler
+      iter.flatMap { pickedResult =>
+        val unpickledBatch = pickle.loads(pickedResult)
+        unpickledBatch.asInstanceOf[java.util.ArrayList[Any]]
+      }
+    }.mapPartitions { iter =>
+      val row = new GenericMutableRow(1)
+      iter.map { result =>
+        row(0) = udf.dataType match {
+          case StringType => result.toString
+          case other => result
+        }
+        row: Row
+      }
+    }
+
+    childResults.zip(pyRDD).mapPartitions { iter =>
+      val joinedRow = new JoinedRow()
+      iter.map {
+        case (row, udfResult) =>
+          joinedRow(row, udfResult)
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..a9a11285def5404dbadde06e5886d7a1c70bea67
--- /dev/null
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaAPISuite.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java;
+
+import java.io.Serializable;
+
+import org.apache.spark.sql.api.java.UDF1;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Suite;
+import org.junit.runner.RunWith;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+  private transient JavaSparkContext sc;
+  private transient JavaSQLContext sqlContext;
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", "JavaAPISuite");
+    sqlContext = new JavaSQLContext(sc);
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void udf1Test() {
+    // With Java 8 lambdas:
+    // sqlContext.registerFunction(
+    //   "stringLengthTest", (String str) -> str.length(), DataType.IntegerType);
+
+    sqlContext.registerFunction("stringLengthTest", new UDF1<String, Integer>() {
+      @Override
+      public Integer call(String str) throws Exception {
+        return str.length();
+      }
+    }, DataType.IntegerType);
+
+    // TODO: Why do we need this cast?
+    Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test')").first();
+    assert(result.getInt(0) == 4);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void udf2Test() {
+    // With Java 8 lambdas:
+    // sqlContext.registerFunction(
+    //   "stringLengthTest",
+    //   (String str1, String str2) -> str1.length() + str2.length,
+    //   DataType.IntegerType);
+
+    sqlContext.registerFunction("stringLengthTest", new UDF2<String, String, Integer>() {
+      @Override
+      public Integer call(String str1, String str2) throws Exception {
+        return str1.length() + str2.length();
+      }
+    }, DataType.IntegerType);
+
+    // TODO: Why do we need this cast?
+    Row result = (Row) sqlContext.sql("SELECT stringLengthTest('test', 'test2')").first();
+    assert(result.getInt(0) == 9);
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
index 4f0b85f26254bf87c82d0c66fb4c3677aeedd6d8..23a711d08c58b1fff2f09acfd5dc339134c2cd56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.io.File
+import _root_.java.io.File
 
 /* Implicits */
 import org.apache.spark.sql.test.TestSQLContext._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..76aa9b0081d7e0cf9b69194a9a059be2b160787f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import TestSQLContext._
+
+class UDFSuite extends QueryTest {
+
+  test("Simple UDF") {
+    registerFunction("strLenScala", (_: String).length)
+    assert(sql("SELECT strLenScala('test')").first().getInt(0) === 4)
+  }
+
+  test("TwoArgument UDF") {
+    registerFunction("strLenScala", (_: String).length + (_:Int))
+    assert(sql("SELECT strLenScala('test', 1)").first().getInt(0) === 5)
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2c7270d9f83a9715461bcdeefbea9e16f03613ca..3c70b3f0921a599f9635df4ded5bd28f23a2300d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList}
 
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
@@ -35,8 +35,9 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{OverrideFunctionRegistry, Analyzer, OverrideCatalog}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.ExtractPythonUdfs
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.{Command => PhysicalCommand}
 import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
@@ -155,10 +156,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     }
   }
 
+  // Note that HiveUDFs will be overridden by functions registered in this context.
+  override protected[sql] lazy val functionRegistry =
+    new HiveFunctionRegistry with OverrideFunctionRegistry
+
   /* An analyzer that uses the Hive metastore. */
   @transient
   override protected[sql] lazy val analyzer =
-    new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false)
+    new Analyzer(catalog, functionRegistry, caseSensitive = false)
 
   /**
    * Runs the specified SQL query using Hive.
@@ -250,7 +255,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[sql] abstract class QueryExecution extends super.QueryExecution {
     // TODO: Create mixin for the analyzer instead of overriding things here.
     override lazy val optimizedPlan =
-      optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
+      optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
 
     override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 728452a25a00e4c3ce05a783b09578a31a58b515..c605e8adcfb0f73f6fcb2ce1f7b882959bb14e50 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -297,8 +297,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   def reset() {
     try {
       // HACK: Hive is too noisy by default.
-      org.apache.log4j.LogManager.getCurrentLoggers.foreach { logger =>
-        logger.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)
+      org.apache.log4j.LogManager.getCurrentLoggers.foreach { log =>
+        log.asInstanceOf[org.apache.log4j.Logger].setLevel(org.apache.log4j.Level.WARN)
       }
 
       // It is important that we RESET first as broken hooks that might have been set could break
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index d181921269b5629173b82c7458a551793982daa9..179aac5cbd5cd94500e170eaf4c9812d5c7f3555 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -34,7 +34,8 @@ import org.apache.spark.util.Utils.getContextOrSparkClassLoader
 /* Implicit conversions */
 import scala.collection.JavaConversions._
 
-private[hive] object HiveFunctionRegistry extends analysis.FunctionRegistry with HiveInspectors {
+private[hive] abstract class HiveFunctionRegistry
+  extends analysis.FunctionRegistry with HiveInspectors {
 
   def getFunctionInfo(name: String) = FunctionRegistry.getFunctionInfo(name)
 
@@ -92,9 +93,8 @@ private[hive] abstract class HiveUdf extends Expression with Logging with HiveFu
 }
 
 private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[Expression])
-  extends HiveUdf {
+  extends HiveUdf with HiveInspectors {
 
-  import org.apache.spark.sql.hive.HiveFunctionRegistry._
   type UDFType = UDF
 
   @transient
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 11d8b1f0a3d96f88a3d08400a11554c29d92e3f6..95921c3d7ae09cdd24d029ed9dffbc951dcb8b4b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -51,9 +51,9 @@ class QueryTest extends FunSuite {
         fail(
           s"""
             |Exception thrown while executing query:
-            |${rdd.logicalPlan}
+            |${rdd.queryExecution}
             |== Exception ==
-            |$e
+            |${stackTraceToString(e)}
           """.stripMargin)
     }