diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 12263e6a75af84359beb8f55ba3408aded5f0a1e..69e563ef36e87bcf2a2ec15a4fb06b66e0360687 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -436,6 +436,19 @@ def shiftRight(col, numBits):
     return Column(jc)
 
 
+@since(1.5)
+def shiftRightUnsigned(col, numBits):
+    """Unsigned shift the the given value numBits right.
+
+    >>> sqlContext.createDataFrame([(-42,)], ['a']).select(shiftRightUnsigned('a', 1).alias('r'))\
+    .collect()
+    [Row(r=9223372036854775787)]
+    """
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.shiftRightUnsigned(_to_java_column(col), numBits)
+    return Column(jc)
+
+
 @since(1.4)
 def sparkPartitionId():
     """A column for partition ID of the Spark task.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 9163b032adee4d8693286910afb9e9ddb2448f4f..cd5ba1217ccc0ff8afb05dc51fadd87f020bfc09 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -129,6 +129,7 @@ object FunctionRegistry {
     expression[Rint]("rint"),
     expression[ShiftLeft]("shiftleft"),
     expression[ShiftRight]("shiftright"),
+    expression[ShiftRightUnsigned]("shiftrightunsigned"),
     expression[Signum]("sign"),
     expression[Signum]("signum"),
     expression[Sin]("sin"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index 273a6c50165771d41e1bc5252087040d264109db..0fc320fb0887688276bb6dabd01d8d301ee66ef8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -521,6 +521,55 @@ case class ShiftRight(left: Expression, right: Expression) extends BinaryExpress
   }
 }
 
+case class ShiftRightUnsigned(left: Expression, right: Expression) extends BinaryExpression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (left.dataType, right.dataType) match {
+      case (NullType, _) | (_, NullType) => return TypeCheckResult.TypeCheckSuccess
+      case (_, IntegerType) => left.dataType match {
+        case LongType | IntegerType | ShortType | ByteType =>
+          return TypeCheckResult.TypeCheckSuccess
+        case _ => // failed
+      }
+      case _ => // failed
+    }
+    TypeCheckResult.TypeCheckFailure(
+      s"ShiftRightUnsigned expects long, integer, short or byte value as first argument and an " +
+        s"integer value as second argument, not (${left.dataType}, ${right.dataType})")
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val valueLeft = left.eval(input)
+    if (valueLeft != null) {
+      val valueRight = right.eval(input)
+      if (valueRight != null) {
+        valueLeft match {
+          case l: Long => l >>> valueRight.asInstanceOf[Integer]
+          case i: Integer => i >>> valueRight.asInstanceOf[Integer]
+          case s: Short => s >>> valueRight.asInstanceOf[Integer]
+          case b: Byte => b >>> valueRight.asInstanceOf[Integer]
+        }
+      } else {
+        null
+      }
+    } else {
+      null
+    }
+  }
+
+  override def dataType: DataType = {
+    left.dataType match {
+      case LongType => LongType
+      case IntegerType | ShortType | ByteType => IntegerType
+      case _ => NullType
+    }
+  }
+
+  override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    nullSafeCodeGen(ctx, ev, (result, left, right) => s"$result = $left >>> $right;")
+  }
+}
+
 /**
  * Performs the inverse operation of HEX.
  * Resulting characters are returned as a byte array.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 8457864d1782d238623f62d451e4e4227022903e..20839c83d4fd09e531957bdda60984817ad89d4f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -264,6 +264,19 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(ShiftRight(Literal(-42.toLong), Literal(1)), -21.toLong)
   }
 
+  test("shift right unsigned") {
+    checkEvaluation(ShiftRightUnsigned(Literal.create(null, IntegerType), Literal(1)), null)
+    checkEvaluation(ShiftRightUnsigned(Literal(42), Literal.create(null, IntegerType)), null)
+    checkEvaluation(
+      ShiftRight(Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null)
+    checkEvaluation(ShiftRightUnsigned(Literal(42), Literal(1)), 21)
+    checkEvaluation(ShiftRightUnsigned(Literal(42.toByte), Literal(1)), 21)
+    checkEvaluation(ShiftRightUnsigned(Literal(42.toShort), Literal(1)), 21)
+    checkEvaluation(ShiftRightUnsigned(Literal(42.toLong), Literal(1)), 21.toLong)
+
+    checkEvaluation(ShiftRightUnsigned(Literal(-42.toLong), Literal(1)), 9223372036854775787L)
+  }
+
   test("hex") {
     checkEvaluation(Hex(Literal(28)), "1C")
     checkEvaluation(Hex(Literal(-28)), "FFFFFFFFFFFFFFE4")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 0d5d49c3dd1d706d04f2ccd47e2302fcd546f51d..4b70dc5fdde8d6a03a7b84abc22aefca24d595c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1343,6 +1343,26 @@ object functions {
    */
   def shiftRight(e: Column, numBits: Int): Column = ShiftRight(e.expr, lit(numBits).expr)
 
+  /**
+   * Unsigned shift the the given value numBits right. If the given value is a long value,
+   * it will return a long value else it will return an integer value.
+   *
+   * @group math_funcs
+   * @since 1.5.0
+   */
+  def shiftRightUnsigned(columnName: String, numBits: Int): Column =
+    shiftRightUnsigned(Column(columnName), numBits)
+
+  /**
+   * Unsigned shift the the given value numBits right. If the given value is a long value,
+   * it will return a long value else it will return an integer value.
+   *
+   * @group math_funcs
+   * @since 1.5.0
+   */
+  def shiftRightUnsigned(e: Column, numBits: Int): Column =
+    ShiftRightUnsigned(e.expr, lit(numBits).expr)
+
   /**
    * Shift the the given value numBits right. If the given value is a long value, it will return
    * a long value else it will return an integer value.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala
index dc8f994adbd39213c8796c9bfe69a46a1b3bff48..24bef21b999ea73497d072658f10b1d81e506ace 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala
@@ -304,6 +304,23 @@ class MathExpressionsSuite extends QueryTest {
       Row(21.toLong, 21, 21.toShort, 21.toByte, null))
   }
 
+  test("shift right unsigned") {
+    val df = Seq[(Long, Integer, Short, Byte, Integer, Integer)]((-42, 42, 42, 42, 42, null))
+      .toDF("a", "b", "c", "d", "e", "f")
+
+    checkAnswer(
+      df.select(
+        shiftRightUnsigned('a, 1), shiftRightUnsigned('b, 1), shiftRightUnsigned('c, 1),
+        shiftRightUnsigned('d, 1), shiftRightUnsigned('f, 1)),
+      Row(9223372036854775787L, 21, 21.toShort, 21.toByte, null))
+
+    checkAnswer(
+      df.selectExpr(
+        "shiftRightUnsigned(a, 1)", "shiftRightUnsigned(b, 1)", "shiftRightUnsigned(c, 1)",
+        "shiftRightUnsigned(d, 1)", "shiftRightUnsigned(f, 1)"),
+      Row(9223372036854775787L, 21, 21.toShort, 21.toByte, null))
+  }
+
   test("binary log") {
     val df = Seq[(Integer, Integer)]((123, null)).toDF("a", "b")
     checkAnswer(