Abstract:To address the problem of too much dependency on Hive, SparkSQL replaced the optimizer in Hive with a new SQL optimizer, Catalyst.

This article is shared from Spark New Open Source Features: Catalyst Optimization Process Tailoring in huawei Cloud community by HZjTurbo.

1. Background

The figure above shows a typical Spark Catalyst optimizer layout. There are five phases from a user-typed SQL to a real, scheduled RDD DAG task:

  • Parser: The SQL is resolved into the corresponding abstract syntax tree (AST). Spark is also referred to as Unresolved Logical Plan.
  • Analyzer: The Unresolved Logical Plan is transformed into a Resolved Logical Plan by searching Metadata Catalog information. In this process, tables, columns, and data types are checked.
  • Optimizer: Logical optimization process. The optimized logical Plan can be obtained by converting the matched plans through some optimization rules
  • Planner: Convert statistical information of Optimized Logical Plan into the corresponding Physical Plan
  • Query Execution: Some necessary optimizations before Execution, such as AQE, Exchange Reuse, CodeGen Stages consolidation, etc

In the above five stages, except for Parser (implemented by Antlr), each of the other stages is composed of a single Rule, which is about 200+ in total. Different rules may need to be run multiple times, so for relatively complex queries, It may take a few seconds to get a executed Plan.

Databricks internal benchmarks show that for TPC-DS queries, the average tree conversion function is called about 280k times per query, which is far more than is necessary. Therefore, we explore embedding bitsets in each tree node to pass information about itself and its subtrees, and use schedule invariance to trim unnecessary traversals. Validation by prototype: In the TPC-DS benchmark, we saw optimization speed of about 50%, analysis speed of about 30%, and overall query compilation speed of about 34% (including Hive meta-store RPC and file list) [1].

2. Design and implementation

2.1 Tree Pattern Bits and Rule Id Bits

  • Tree pattern bits

Add the nodePatterns attribute to the TreeNode. All nodes that inherit this class can identify their attributes by copying the value of the attribute.

/** * @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated * patterns in the subtree  of T. */ protected val nodePatterns: Seq[TreePattern] = Seq()

TreePattern is an enumeration type, and each node/expression can be set with a convenient TreePattern identifier, as shown in treepatterns.scala.

For example, nodePatterns for Join nodes:

override val nodePatterns : Seq[TreePattern] = {
  var patterns = Seq(JOIN)
  joinType match {
    case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN
    case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN
    case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN
    case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN
    case _ =>
  }
  patterns
}
  • Rule ID bits

Embed the cached BitSet of the rule ID into each tree/expression node T so that we can track whether rule R is valid or invalid for the subtree rooted in T. This way, if R is called on T and R is known to be invalid, we can skip it if R applies to T again (for example, R is in a fixed-point rule batch). The idea was originally used with the Cascades Optimizer to speed up exploratory planning.

Rule:

abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {

  // The integer id of a rule, for pruning unnecessary tree traversals.
  protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)

TreeNode:

/**
 * A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree.
 * If a rule R (which does not read a varying, external state for each invocation) is
 * ineffective in one apply call for this TreeNode and its subtree, R will still be
 * ineffective for subsequent apply calls on this tree because query plan structures are
 * immutable.
 */
private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)

2.2 Changes to The Transform Function Family

The modified transform method has two more judgments than the previous one, as shown below

Def transformDownWithPruning(cond: TreePatternBits => Boolean) def transformDownWithPruning(cond: TreePatternBits => Boolean) def transformDownWithPruning(cond: TreePatternBits => Boolean) RuleId = "UnknownRuleId" (rule: PartialFunction[BaseType, BaseType]): BaseType = {/ / if the above two conditions exist a don't meet, just skip this rule if (! Cond. Apply (this) | | isRuleIneffective (ruleId)) {return this} / / execution rule logic val afterRule = CurrentOrigin.withOrigin(origin) { rule.applyOrElse(this, identity[BaseType]) } // Check if unchanged and then possibly return old copy to avoid gc churn. if (this fastEquals AfterRule) {val rewritten_plan = mapChildren (_ transformDownWithPruning (cond, ruleId) (rule)) / / if you don't have to take effect, If (this eq rewritten_plan) {markruleascontradicted (ruleId) this} else {rewritten_plan}} else {this eq rewritten_plan}} // If the transform function replaces this node with a new one, carry over the tags. afterRule.copyTagsFrom(this) afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule)) } }

2.3 Changes to An Individual Rule

Examples of rules:

object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform ({ case q: LogicalPlan => q transformExpressionsDown ({ case In(v, list) if list.isEmpty => ... case expr @ In(v, list) if expr.inSetConvertible => ... }, _.containspattern (IN), ruleId) // Must contain IN}, _.containspattern (IN), ruleId) // Must contain IN}, _.containspattern (IN), ruleId)

3. Test results

The TPC-DS query compile time was benchmarked using TPC-DS SF10 in Delta. Here are the results:

  • Figure 1 shows the query compilation speed;
  • Table 1 shows a breakdown of the call counts and CPU reductions for several key tree traversal functions.

I simply ran the open version of TPCDSQuerySuite, which parses and optimizes TPCDS statements and checks the generated code (CodeGen). On average, it took three runs to get the best value, and the results are as follows:

  • Before incorporating PR [2], 156 Tpcds queries were included, with an average total time of ~56s
  • The latest Spark open source code contains 150 Tpcds queries, with an average time of 19s

The reason why the latest Tpcds query has 6 fewer entries than the one before PR is because there is a subsequent PR that reduces repeated Tpcds. The total time before optimization was more than twice as long as after optimization.

Refer to the reference

[1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP

[2]. SPARK-35544 Add tree pattern pruning to Analyzer rules.

[3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link

Click follow to learn about the fresh technologies of Huawei Cloud