Spark SQL is the best part of Spark. I think the overall complexity is more than 5 times that of Spark Streaming. Spark streaming maintenance is also not active, we based on Spark to build big data computing tasks, the focus will also be transferred to the DataSet, the original RDD based code migration, the benefits are very large, especially in terms of performance, a qualitative improvement. The various performance optimizations embedded in Spark SQL are much more likely to be implemented than the so-called best practices that people write RDD to adhere to, especially for beginners. For example, some best practices refer to filter operations followed by map operations, and spark SQL automatically pushes down predicates. For example, avoid using shuffle. Spark SQL automatically uses broadcast Join to broadcast small tables and converts shuffle Join into Map Join if you enable the configuration. This saves us a lot of worries.

Spark SQL’s code complexity is a result of the inherent complexity of the problem. Much of the Catalyst framework logic in Spark SQL is done with a Tree type data structure, which is elegant to implement in Scala. Scala’s partial functions and powerful Case regular matching keep the code clean, and this article briefly describes some of the mechanisms and concepts in Spark SQL.

SparkSession is our entry point for writing spark application code. Starting a Spark-shell gives you a way to create SparkSession. This object is the starting point for the entire Spark application. Let’s look at some important sparkSession variables and methods:

The above mentioned sessionState is a key thing that maintains all the state data used in the current session. It has the following things to maintain:

Spark SQL internally uses dataFrame and Dataset to represent a Dataset. Then you can apply various statistical functions and operators to the Dataset. Some people may not know the dataFrame from the Dataset. A DataFrame is a DataSet of type Row.

type DataFrame = Dataset[Row]

The Row type used here is in the API level exposed by Spark SQL. However, the DataSet does not require the input type to be Row. It can also be strongly typed. The data type processed by the DataSet is Catalyst internal InternalRow or UnsafeRow. There is an Encoder behind the DataSet to implicitly convert the data you entered into InternalRow. The DataFrame corresponds to the RowEncoder.

You can perform a quick operation on the Dataset to produce a tree structure of LogicalPlan elements. If you have a table of students or a score table, you can count the total scores of all students over 11 years old.

This queryExecution is the execution engine of the entire execution plan, which contains the variables in the execution process. The entire execution flow is as follows

Therefore, the SQL statement in the above example will become an abstract syntax tree after Parser parsing, corresponding to the parsed logical plan AST

A graphic point is represented by a graph

We can see that the Filter condition is changed to the Filter node, which is the type of UnaryNode, that is, there is only one child. The data in the two tables is changed to the UnresolvedRelation node, which is the type of LeafNode. As the name implies, the LeafNode. The JOIN operation encapsulates the JOIN node, which is a BinaryNode with two children.

These nodes are of the LogicalPlan type, which can be understood as operators for various operations. Spark SQL defines various operators for various operations.

The abstract syntax tree of these operators is the basis of the entire Catatyst optimization, which the Catatyst optimizer plays around with, moving nodes around in the tree to optimize.

Now we have abstract syntax trees through Parser, but we don’t know what score and sum are, so we need analyer to locate them. Analyzer will convert all Unresolved things in the AST into resolved state. ResolverRelations resolves the basic type of a table (column), ResolveFuncions resolves the basic type of a function (sum), and ResolveFuncions resolves the basic type of a table (column). ResolveReferences may be a little tricky to understand, as fields such as name in Select Name correspond to a variable that exists as an Attribute type when parsing the table. The same variable in the Project node corresponding to the Select becomes a reference with the same ID, so it becomes an AttributeReference after ResolveReferences. Make sure that when the data is actually loaded, they are given the same value, just like when we define a variable when we write code. These rules are applied repeatedly to the node, specifying that the tree node is stable. Of course, too many times of optimization will waste performance. Some rule functions as FixedPoint, which is a trade-off. All right, without further ado, let’s do a little experiment.

We use ResolverRelations to parse our AST. After parsing, we can see that the original UnresolvedRelation is changed into LocalRelation, which represents a table in local memory. This table is registered in the catalog when we use createOrReplaceTempView. The relove operation is to look up the table in the Catalog, find the schema of the table, and parse the corresponding fields. The outer user defined each StructField into AttibuteReference, marked with ID.

Use ResolveReferences to find that the same fields in the upper node are references with the same ID and are of type AttibuteReference. Finally, when all the rules are applied, the entire AST becomes

Let’s look at what logic optimization has:

See? I changed my 100 plus 10 to 110.

PushPredicateThroughJoin pushes a Filter that only filters the STU table down before the Join. This results in much less data loading and improved performance.

ColumnPruning, PushPredicateThroughJoin, ConstantFolding, RemoveRedundantAliases

After logic optimization is completed, it is only an abstract logical layer. You need to convert a logical execution plan into a plan that Can be executed by Spark.

Spark SQL converts logical nodes to corresponding physical nodes, such as Join operators. Spark formulates different algorithm strategies for this operator based on different scenarios, such as BroadcastHashJoin, ShuffleHashJoin, and SortMergeJoin. Of course, there are many optimization points in spark. During conversion, Spark will make intelligent selection based on some statistical data, which involves cost-based optimization. This is also a big part, which can be discussed in a separate article later. The BroadcastExchange node extends the Exchage class used to exchange data between the BroadcastExchange nodes. BroadcastExchange BroadcastEd Data generated by LocalTableScan to each Executor node for Map-side join. The Final Aggregate operation is divided into two steps. The first step is parallel aggregation, and then Final aggregation of the aggregated results, which is similar to combine and the last reduce in map-reduce domain name. Exchange Hashpartitioning is added in the middle to ensure the same key shuffle to the same partition. Shuffle is required if the Distribution of Child output data in the current physical plan fails to meet the requirements. This is one of the EnsureRequirement (or “join”) options that will be added to the final configuration stage. In the database world, there is a EnsureRequirement option and we will highlight some of the trade-offs that Spark SQL can make when joining.

Join operation can basically divide the two Join tables into large table and small table. The large table is used as the stream traversal table, and the small table is used as the lookup table. Then, for each record in the large table, the lookup table with the same Key is fetched according to the Key.

Spark supports all types of joins:

The join operation in Spark SQL selects different join policies based on various conditions, including BroadcastHashJoin, SortMergeJoin, and ShuffleHashJoin.

  • BroadcastHashJoin: Spark if judging table storage space is less than the broadcast threshold (used in parameter spark spark. SQL. AutoBroadcastJoinThreshold to control choose BroadcastHashJoin threshold, The default value is 10MB), which broadcasts the small table to the Executor, and then places the small table in a hash table as a lookup table. The join operation can be performed by a map operation, avoiding the shuffle operation. BroadcastHashJoin does not support full outer join. For right outer join, broadcast left table, for left outer join, left semi join, Left anti-Join, broadcast right table, for inner join, which table is small broadcast which table.

  • SortMergeJoin: If both tables have large data, SortMergeJoin is better. SortMergeJoin shuffle records with the same key into a partition. Then sort merge the two tables that have been sorted. The price is acceptable.

  • ShuffleHashJoin: ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin ShuffleHashJoin Lookup table cannot exceed the size of the spark. SQL. AutoBroadcastJoinThreshold values, or you use BroadcastHashJoin, Each partition. The average size of no more than the spark SQL. AutoBroadcastJoinThreshold, such guarantee lookup table can be placed in memory don’t OOM, there is a condition that big table is 3 times bigger than the small table, in order to play the advantages of the Join.

As mentioned above, the nodes on the AST have been converted to physical nodes. These physical nodes eventually recursively call the execute method on the head node, which calls the transform operation on the RDD generated by the Child to produce a chain of RDD. Just like the spark Stremaing recursively calls DStream. The result is as follows:

You can see that the final execution of this is split into two stages. The small table broeadcastExechage is added to the large table to perform BroadcastHashJoin without evolving shuffle. Then, in the final step of aggregation, The map segment performs a HashAggregate sum function, and then the Exchage operation shuffled the data with the same key to the same partition according to the name. Then the final HashAggregate sum operation is performed. So here’s a WholeStageCodegen which is kind of weird, but what does this do, because when we’re executing operators like Filter and Project, these operators contain a lot of expressions, For example, SELECT sum(v),name, where sum and v are Expression, where V belongs to the Attribute variable Expression, which is also a tree data structure. Sum (v) is a tree structure consisting of the sum node and its child node v. These expressions can be evaluated and generated code. The most basic function of an Expression is to evaluate and evaluate the input Row. InternalRow = null): Any function to implement its function.

The output can be of any type, but Project and Filter Plan outputs def output: Seq[Attribute], which represents a set of variables, such as Filter (age >= 11) plan, where age>11 is an expression, the > expression depends on two child nodes. A Literal constant expression evaluates to 11, and an Attribute variable expression, age, is converted to AttributeReference in the Analyze phase, but is Unevaluable. To obtain a property’s value in the input Row, bind the index of the variable to a Row according to the schema association, and generate a BoundReference. Then the BoundReference expression retrieves the Row value based on index during eval. The final output type of the expression age>11 is Boolean, but the output type of the Filter Plan is Seq[Attribute].

Can imagine, the data flow in a plan a, then every expression inside the plan to deal with data, is equivalent to a small function call processing, there are a lot of function call overhead, so we can take these small functions inline, as a big function, That’s what WholeStageCodegen does.

You can see that the final execution plan is preceded by an asterisk (*) for each node, indicating that the entire code generation is enabled. In our example, Filter, Project, BroadcastHashJoin, Project, HashAggregate this section is enabled for the entire section of code generation, cascade to two big functions, are interested can use a.q ueryExecution. Debug. Codegen what the code looks like after the next generation. However, the Exchange operator does not implement full code generation because it needs to send data across the network.

I’ll leave you there. Spark SQL has a lot of interesting stuff in it, but because of the complexity of the problem, it requires a high level of abstraction to make sense of it all, which makes it hard for the code reader to understand, but you can learn a lot if you really look into it. If you have any thoughts on this article, feel free to share them in the comments below.

People say

The Great Talk column is dedicated to the discovery of the minds of technical people, including technical practices, technical dry goods, technical insights, growth tips, and anything worth discovering. We hope to gather the best technical people to dig out the original, sharp and contemporary sound.

Email: [email protected]