From c92949ac23652e2c3a0c97fdf3d6e016f9d01dda Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Tue, 6 Jun 2017 22:50:06 -0700 Subject: [PATCH] [SPARK-20972][SQL] rename HintInfo.isBroadcastable to broadcast ## What changes were proposed in this pull request? `HintInfo.isBroadcastable` is actually not an accurate name, it's used to force the planner to broadcast a plan no matter what the data size is, via the hint mechanism. I think `forceBroadcast` is a better name. And `isBroadcastable` only have 2 possible values: `Some(true)` and `None`, so we can just use boolean type for it. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #18189 from cloud-fan/stats. --- .../sql/catalyst/analysis/ResolveHints.scala | 6 +++--- .../sql/catalyst/plans/logical/hints.scala | 16 +++++++-------- .../catalyst/analysis/ResolveHintsSuite.scala | 20 +++++++++---------- .../BasicStatsEstimationSuite.scala | 6 +++--- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/functions.scala | 2 +- 6 files changed, 25 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 62a3482d9f..f068bce3e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -58,9 +58,9 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) => - ResolvedHint(plan, HintInfo(isBroadcastable = Option(true))) + ResolvedHint(plan, HintInfo(broadcast = true)) case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) => - ResolvedHint(plan, HintInfo(isBroadcastable = Option(true))) + ResolvedHint(plan, HintInfo(broadcast = true)) case _: ResolvedHint | _: View | _: With | _: SubqueryAlias => // Don't traverse down these nodes. @@ -89,7 +89,7 @@ object ResolveHints { case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { // If there is no table alias specified, turn the entire subtree into a BroadcastHint. - ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true))) + ResolvedHint(h.child, HintInfo(broadcast = true)) } else { // Otherwise, find within the subtree query plans that should be broadcasted. applyBroadcastHint(h.child, h.parameters.map { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index d16fae56b3..e49970df80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -51,19 +51,17 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo()) } -case class HintInfo( - isBroadcastable: Option[Boolean] = None) { +case class HintInfo(broadcast: Boolean = false) { /** Must be called when computing stats for a join operator to reset hints. */ - def resetForJoin(): HintInfo = copy( - isBroadcastable = None - ) + def resetForJoin(): HintInfo = copy(broadcast = false) override def toString: String = { - if (productIterator.forall(_.asInstanceOf[Option[_]].isEmpty)) { - "none" - } else { - isBroadcastable.map(x => s"isBroadcastable=$x").getOrElse("") + val hints = scala.collection.mutable.ArrayBuffer.empty[String] + if (broadcast) { + hints += "broadcast" } + + if (hints.isEmpty) "none" else hints.mkString("(", ", ", ")") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 3d5148008c..9782b5fb0d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -36,17 +36,17 @@ class ResolveHintsSuite extends AnalysisTest { test("case-sensitive or insensitive parameters") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))), + ResolvedHint(testRelation, HintInfo(broadcast = true)), caseSensitive = false) checkAnalysis( UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")), - ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))), + ResolvedHint(testRelation, HintInfo(broadcast = true)), caseSensitive = false) checkAnalysis( UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))), + ResolvedHint(testRelation, HintInfo(broadcast = true)), caseSensitive = true) checkAnalysis( @@ -58,28 +58,28 @@ class ResolveHintsSuite extends AnalysisTest { test("multiple broadcast hint aliases") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), - Join(ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))), - ResolvedHint(testRelation2, HintInfo(isBroadcastable = Option(true))), Inner, None), + Join(ResolvedHint(testRelation, HintInfo(broadcast = true)), + ResolvedHint(testRelation2, HintInfo(broadcast = true)), Inner, None), caseSensitive = false) } test("do not traverse past existing broadcast hints") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("table"), - ResolvedHint(table("table").where('a > 1), HintInfo(isBroadcastable = Option(true)))), - ResolvedHint(testRelation.where('a > 1), HintInfo(isBroadcastable = Option(true))).analyze, + ResolvedHint(table("table").where('a > 1), HintInfo(broadcast = true))), + ResolvedHint(testRelation.where('a > 1), HintInfo(broadcast = true)).analyze, caseSensitive = false) } test("should work for subqueries") { checkAnalysis( UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), - ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))), + ResolvedHint(testRelation, HintInfo(broadcast = true)), caseSensitive = false) checkAnalysis( UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), - ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))), + ResolvedHint(testRelation, HintInfo(broadcast = true)), caseSensitive = false) // Negative case: if the alias doesn't match, don't match the original table name. @@ -104,7 +104,7 @@ class ResolveHintsSuite extends AnalysisTest { |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable """.stripMargin ), - ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(isBroadcastable = Option(true))) + ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(broadcast = true)) .select('a).analyze, caseSensitive = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala index 2afea6dd3d..833f5a7199 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala @@ -45,11 +45,11 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase { expectedStatsCboOn = filterStatsCboOn, expectedStatsCboOff = filterStatsCboOff) - val broadcastHint = ResolvedHint(filter, HintInfo(isBroadcastable = Option(true))) + val broadcastHint = ResolvedHint(filter, HintInfo(broadcast = true)) checkStats( broadcastHint, - expectedStatsCboOn = filterStatsCboOn.copy(hints = HintInfo(isBroadcastable = Option(true))), - expectedStatsCboOff = filterStatsCboOff.copy(hints = HintInfo(isBroadcastable = Option(true))) + expectedStatsCboOn = filterStatsCboOn.copy(hints = HintInfo(broadcast = true)), + expectedStatsCboOff = filterStatsCboOff.copy(hints = HintInfo(broadcast = true)) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f13294c925..ea86f6e00f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -114,7 +114,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ private def canBroadcast(plan: LogicalPlan): Boolean = { - plan.stats(conf).hints.isBroadcastable.getOrElse(false) || + plan.stats(conf).hints.broadcast || (plan.stats(conf).sizeInBytes >= 0 && plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 67ec1325b3..8d0a8c2178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1020,7 +1020,7 @@ object functions { */ def broadcast[T](df: Dataset[T]): Dataset[T] = { Dataset[T](df.sparkSession, - ResolvedHint(df.logicalPlan, HintInfo(isBroadcastable = Option(true))))(df.exprEnc) + ResolvedHint(df.logicalPlan, HintInfo(broadcast = true)))(df.exprEnc) } /** -- GitLab