diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index cd5ba1217ccc0ff8afb05dc51fadd87f020bfc09..a1299aed555c10527fa5493f906d3ddebf44ea1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -163,7 +163,11 @@ object FunctionRegistry {
     expression[Substring]("substring"),
     expression[Upper]("ucase"),
     expression[UnHex]("unhex"),
-    expression[Upper]("upper")
+    expression[Upper]("upper"),
+
+    // datetime functions
+    expression[CurrentDate]("current_date"),
+    expression[CurrentTimestamp]("current_timestamp")
   )
 
   val builtin: FunctionRegistry = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
new file mode 100644
index 0000000000000000000000000000000000000000..13ba2f2e5d62da760fbed21e5fcfcfb3eb57429e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+/**
+ * Returns the current date at the start of query evaluation.
+ * All calls of current_date within the same query return the same value.
+ */
+case class CurrentDate() extends LeafExpression {
+  override def foldable: Boolean = true
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = DateType
+
+  override def eval(input: InternalRow): Any = {
+    DateTimeUtils.millisToDays(System.currentTimeMillis())
+  }
+}
+
+/**
+ * Returns the current timestamp at the start of query evaluation.
+ * All calls of current_timestamp within the same query return the same value.
+ */
+case class CurrentTimestamp() extends LeafExpression {
+  override def foldable: Boolean = true
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = TimestampType
+
+  override def eval(input: InternalRow): Any = {
+    System.currentTimeMillis() * 10000L
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DatetimeFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DatetimeFunctionsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1618c24871c605145f1da17b378f0966129ea01d
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DatetimeFunctionsSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+
+class DatetimeFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
+  test("datetime function current_date") {
+    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
+    val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)
+  }
+
+  test("datetime function current_timestamp") {
+    val ct = DateTimeUtils.toJavaTimestamp(CurrentTimestamp().eval(EmptyRow).asInstanceOf[Long])
+    val t1 = System.currentTimeMillis()
+    assert(math.abs(t1 - ct.getTime) < 5000)
+  }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d261baf920c0cf731510a2790b52f9e70670b404..25e37ff67aa004deb8e5685208416bd0ea0c8612 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -35,6 +35,7 @@ import org.apache.spark.util.Utils
  *
  * @groupname udf_funcs UDF functions
  * @groupname agg_funcs Aggregate functions
+ * @groupname datetime_funcs Date time functions
  * @groupname sort_funcs Sorting functions
  * @groupname normal_funcs Non-aggregate functions
  * @groupname math_funcs Math functions
@@ -991,6 +992,22 @@ object functions {
    */
   def cosh(columnName: String): Column = cosh(Column(columnName))
 
+  /**
+   * Returns the current date.
+   *
+   * @group datetime_funcs
+   * @since 1.5.0
+   */
+  def current_date(): Column = CurrentDate()
+
+  /**
+   * Returns the current timestamp.
+   *
+   * @group datetime_funcs
+   * @since 1.5.0
+   */
+  def current_timestamp(): Column = CurrentTimestamp()
+
   /**
    * Computes the exponential of the given value.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatetimeExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatetimeExpressionsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..44b915304533cddd56efe8ed1e89e97545525764
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatetimeExpressionsSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.functions._
+
+class DatetimeExpressionsSuite extends QueryTest {
+  private lazy val ctx = org.apache.spark.sql.test.TestSQLContext
+
+  import ctx.implicits._
+
+  lazy val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
+
+  test("function current_date") {
+    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
+    val d2 = DateTimeUtils.fromJavaDate(
+      ctx.sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
+    val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
+  }
+
+  test("function current_timestamp") {
+    checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))
+    // Execution in one query should return the same value
+    checkAnswer(ctx.sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""),
+      Row(true))
+    assert(math.abs(ctx.sql("""SELECT CURRENT_TIMESTAMP()""").collect().head.getTimestamp(
+      0).getTime - System.currentTimeMillis()) < 5000)
+  }
+
+}