diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index d6758eb5b6a323424a702f5d92c8fb4c4e38dcc7..bd8131c9af6e0f83b631c63639fb5e22731d0a86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -26,10 +26,22 @@ object HiveTypeCoercion {
   // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
   // The conversion for integral and floating point types have a linear widening hierarchy:
   val numericPrecedence =
-    Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
-  // Boolean is only wider than Void
-  val booleanPrecedence = Seq(NullType, BooleanType)
-  val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
+    Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
+  val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: Nil
+
+  def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
+    val valueTypes = Seq(t1, t2).filter(t => t != NullType)
+    if (valueTypes.distinct.size > 1) {
+      // Try and find a promotion rule that contains both types in question.
+      val applicableConversion =
+        HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
+
+      // If found return the widest common type, otherwise None
+      applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
+    } else {
+      Some(if (valueTypes.size == 0) NullType else valueTypes.head)
+    }
+  }
 }
 
 /**
@@ -53,17 +65,6 @@ trait HiveTypeCoercion {
     Division ::
     Nil
 
-  trait TypeWidening {
-    def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
-      // Try and find a promotion rule that contains both types in question.
-      val applicableConversion =
-        HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
-
-      // If found return the widest common type, otherwise None
-      applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
-    }
-  }
-
   /**
    * Applies any changes to [[AttributeReference]] data types that are made by other rules to
    * instances higher in the query tree.
@@ -144,7 +145,8 @@ trait HiveTypeCoercion {
    * - LongType to FloatType
    * - LongType to DoubleType
    */
-  object WidenTypes extends Rule[LogicalPlan] with TypeWidening {
+  object WidenTypes extends Rule[LogicalPlan] {
+    import HiveTypeCoercion._
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       case u @ Union(left, right) if u.childrenResolved && !u.resolved =>
@@ -352,7 +354,9 @@ trait HiveTypeCoercion {
   /**
    * Coerces the type of different branches of a CASE WHEN statement to a common type.
    */
-  object CaseWhenCoercion extends Rule[LogicalPlan] with TypeWidening {
+  object CaseWhenCoercion extends Rule[LogicalPlan] {
+    import HiveTypeCoercion._
+
     def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       case cw @ CaseWhen(branches) if !cw.resolved && !branches.exists(!_.resolved)  =>
         val valueTypes = branches.sliding(2, 2).map {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
index b9e0f8e9dcc5f02c8082957311aaaaeb672c3a46..ba8b853b6f99e829638bef5c3babd25ced4e6dd4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
@@ -23,20 +23,20 @@ import org.apache.spark.sql.catalyst.types._
 
 class HiveTypeCoercionSuite extends FunSuite {
 
-  val rules = new HiveTypeCoercion { }
-  import rules._
-
-  test("tightest common bound for numeric and boolean types") {
+  test("tightest common bound for types") {
     def widenTest(t1: DataType, t2: DataType, tightestCommon: Option[DataType]) {
-      var found = WidenTypes.findTightestCommonType(t1, t2)
+      var found = HiveTypeCoercion.findTightestCommonType(t1, t2)
       assert(found == tightestCommon,
         s"Expected $tightestCommon as tightest common type for $t1 and $t2, found $found")
       // Test both directions to make sure the widening is symmetric.
-      found = WidenTypes.findTightestCommonType(t2, t1)
+      found = HiveTypeCoercion.findTightestCommonType(t2, t1)
       assert(found == tightestCommon,
         s"Expected $tightestCommon as tightest common type for $t2 and $t1, found $found")
     }
 
+    // Null
+    widenTest(NullType, NullType, Some(NullType))
+
     // Boolean
     widenTest(NullType, BooleanType, Some(BooleanType))
     widenTest(BooleanType, BooleanType, Some(BooleanType))
@@ -60,12 +60,28 @@ class HiveTypeCoercionSuite extends FunSuite {
     widenTest(DoubleType, DoubleType, Some(DoubleType))
 
     // Integral mixed with floating point.
-    widenTest(NullType, FloatType, Some(FloatType))
-    widenTest(NullType, DoubleType, Some(DoubleType))
     widenTest(IntegerType, FloatType, Some(FloatType))
     widenTest(IntegerType, DoubleType, Some(DoubleType))
     widenTest(IntegerType, DoubleType, Some(DoubleType))
     widenTest(LongType, FloatType, Some(FloatType))
     widenTest(LongType, DoubleType, Some(DoubleType))
+
+    // StringType
+    widenTest(NullType, StringType, Some(StringType))
+    widenTest(StringType, StringType, Some(StringType))
+    widenTest(IntegerType, StringType, None)
+    widenTest(LongType, StringType, None)
+
+    // TimestampType
+    widenTest(NullType, TimestampType, Some(TimestampType))
+    widenTest(TimestampType, TimestampType, Some(TimestampType))
+    widenTest(IntegerType, TimestampType, None)
+    widenTest(StringType, TimestampType, None)
+
+    // ComplexType
+    widenTest(NullType, MapType(IntegerType, StringType, false), Some(MapType(IntegerType, StringType, false)))
+    widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
+    widenTest(StringType, MapType(IntegerType, StringType, true), None)
+    widenTest(ArrayType(IntegerType), StructType(Seq()), None)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 1c0b03c684f102edf316f8564b685158611be1f7..70062eae3b7ceb1c91cd110bdd452451641711c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -125,38 +125,31 @@ private[sql] object JsonRDD extends Logging {
    * Returns the most general data type for two given data types.
    */
   private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
-    // Try and find a promotion rule that contains both types in question.
-    val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p
-      .contains(t2))
-
-    // If found return the widest common type, otherwise None
-    val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
-
-    if (returnType.isDefined) {
-      returnType.get
-    } else {
-      // t1 or t2 is a StructType, ArrayType, or an unexpected type.
-      (t1, t2) match {
-        case (other: DataType, NullType) => other
-        case (NullType, other: DataType) => other
-        case (StructType(fields1), StructType(fields2)) => {
-          val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
-            case (name, fieldTypes) => {
-              val dataType = fieldTypes.map(field => field.dataType).reduce(
-                (type1: DataType, type2: DataType) => compatibleType(type1, type2))
-              StructField(name, dataType, true)
+    HiveTypeCoercion.findTightestCommonType(t1, t2) match {
+      case Some(commonType) => commonType
+      case None =>
+        // t1 or t2 is a StructType, ArrayType, or an unexpected type.
+        (t1, t2) match {
+          case (other: DataType, NullType) => other
+          case (NullType, other: DataType) => other
+          case (StructType(fields1), StructType(fields2)) => {
+            val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
+              case (name, fieldTypes) => {
+                val dataType = fieldTypes.map(field => field.dataType).reduce(
+                  (type1: DataType, type2: DataType) => compatibleType(type1, type2))
+                StructField(name, dataType, true)
+              }
             }
+            StructType(newFields.toSeq.sortBy {
+              case StructField(name, _, _) => name
+            })
           }
-          StructType(newFields.toSeq.sortBy {
-            case StructField(name, _, _) => name
-          })
+          case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
+            ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
+          // TODO: We should use JsonObjectStringType to mark that values of field will be
+          // strings and every string is a Json object.
+          case (_, _) => StringType
         }
-        case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
-          ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
-        // TODO: We should use JsonObjectStringType to mark that values of field will be
-        // strings and every string is a Json object.
-        case (_, _) => StringType
-      }
     }
   }