Spark SQL is more than 5 times the complexity of Spark streaming. The maintenance of Spark Streaming is not active, we build big data computing tasks based on Spark, and the focus will also shift to the DataSet. The original RDD written code will be moved to the DataSet, and the benefits will be very large, especially in terms of performance, and there will be qualitative improvement. The various embedded performance optimizations in Spark SQL are more likely to follow the so-called best practices, especially for beginners, than raw human RDD, such as filter before map, which automatically pushes predicates down. For example, avoid using the shuffle operation. If you enable the related configuration in spark SQL, broadcast Join is automatically used to broadcast small tables, and Shuffle Join is converted to Map join.

The complexity of the Spark SQL code is a consequence of the complexity of the problem. The Catalyst framework in Spark SQL does most of its logic on a Tree data structure, which is elegant to implement based on Scala. Scala’s partial functions and powerful Case regular matching make the code look clean, and this article briefly describes some of the mechanisms and concepts in Spark SQL.

SparkSession is the entry point for our Spark application code. Starting a Spark-shell will give you an object to create a SparkSession. This object is the starting point for the entire Spark application. Let’s take a look at some of sparkSession’s important variables and methods:

SessionState is a key thing that maintains all the state data used by the current session. There are various things to maintain:

Spark SQL internally uses a dataFrame and a Dataset to represent a Dataset. You can then apply various statistical functions and operators to the Dataset. DataFrame is a DataSet of type Row,

type DataFrame = Dataset[Row]

The Row type is exposed to the Spark SQL API. However, the DataSet does not require the Row type to be entered. It can also be strongly typed. The underlying data type of the DataSet is Catalyst internal InternalRow or UnsafeRow, and there is an Encoder behind it that implicitly converts the data you input to the internal InternalRow. DataFrame corresponds to RowEncoder.

You can make a tree structure with elements of type LogicalPlan if you do a doubling on the Dataset. For example, if I have a table of students, a table of scores, the requirement is to calculate the total scores of all students over the age of 11.

The queryExecution is the execution engine of the entire execution plan, including the intermediate process variables, as follows

The SQL statement in the above example will become an abstract syntax tree after parsed by the Parser, and the corresponding logical plan AST after parsed is

Let me draw it a little bit more vividly

We can see that the Filter condition becomes the Filter node, which is of type UnaryNode, that is, there is only one child, and the data in the two tables becomes the UnresolvedRelation node, which is of type LeafNode, as the name implies, LeafNode, The JOIN operation represents the JOIN node, which is a BinaryNode with two children.

The nodes mentioned above are of LogicalPlan type and can be understood as operators that perform various operations. Spark SQL defines operators for various operations.

The abstract syntax tree composed of these operators is the basis of the entire Catatyst optimization, and the Catatyst optimizer will do all kinds of things on the tree, moving the nodes on the tree to optimize.

Now we have abstract syntax tree after Parser, but we don’t know what score and sum are, so analyer is needed to locate. Analyzer will change all Unresolved things on AST to resolved state. For example, ResolverRelations is the base type of the resolved table (column), and ResolveFuncions is the base type of the resolved function. The name in the Select name column corresponds to a variable that is used as an Attribute when parsing the table. Then the same variable in the Project node that corresponds to Select becomes a reference, they have the same ID, so after the resolvereference process, it becomes an AttributeReference type. Make sure that they are given the same value when the data is actually loaded at the end, just like when we define a variable when we write code. These rules are applied repeatedly to the nodes, specifying that the nodes of the tree are stable. Of course, optimizing too many times wastes performance. Some rules function as FixedPoint, which is a trade-off. All right, without further ado, let’s do a little experiment.

We use ResolverRelations to parse our AST, and we can see that UnresolvedRelation is now LocalRelation, which represents a table in local memory, This table is registered in the catalog when we use createOrReplaceTempView. The relove operation does nothing more than look up the table in the catalog, find the schema of the table, and parse out the corresponding fields. Each StructField defined by the outer user is transformed into AttibuteReference and marked with ID.

If we use ResolveReferences, you will find that the same fields in the upper node are all references with the same ID, and they are of type AttibuteReference. Eventually all the rules are applied, and the entire AST becomes

The focus of the following, to carry out logical optimization, let’s see what are the logical optimization:

See? I replaced my 100 plus 10 with 110.

PushPredicateThroughJoin is used to push down a Filter that only filters the STU table to the Join. It will load a lot less data and optimize the performance. Let’s take a look at the final result.

ColumnPruning, PushPredicateThroughJoin, ConstantFolding, RemoveRedundantAliases

After the logical optimization is completed, it is only an abstract logical layer. You need to convert the logical execution plan into a physical execution plan that can be executed by Spark.

Spark SQL converts logical nodes into corresponding physical nodes, such as Join operators. Spark formulates different algorithms for the operators based on different scenarios, such as BroadcastHashJoin, ShuffleHashJoin, and SortMergeJoin. Of course, there are many optimization points in this. Spark will make intelligent selection according to some statistics during the transformation, which involves cost-based optimization, which is also a big part, which can be discussed in an article later. In our example, broadcase AshJoin is automatically converted because the data volume is less than 10M. The BroadcastExchange node inherits its Exchage class and is used to exchange data between nodes. The BroadcastExchange function is used to broadcast LocalTableScan data to each Executor node for map-side join. The Final Aggregate operation is divided into two steps. The first step is to perform parallel aggregation, and then Final aggregation of the aggregated results. This is similar to map-reduce combine and Final reduce. There’s an Exchange hashpartitioning in the middle, which is to make sure that the same key is shuffled to the same partition, so if the Distribution of the Child output of the current physical plan is not up to the requirement, shuffle it, “This is the EnsureRequirement phase — the new swapped data node — that’s being inserted with the final EnsureRequirement phase. So, in the database space, we’ll focus on some of the tradeoff that Spark SQL is trying to make with the join operation.”

The Join operation can basically divide two Join tables into large table and small table. The large table is used as a streaming traversal table, and the small table is used as a lookup table. Then, for each record in the large table, the records with the same Key in the lookup table can be obtained according to the Key.

Spark supports all types of joins:

The join operations in Spark SQL select different join policies based on different conditions, including BroadcastHashJoin, SortMergeJoin, and ShuffleHashJoin.

  • BroadcastHashJoin: Spark if judging table storage space is less than the broadcast threshold (used in parameter spark spark. SQL. AutoBroadcastJoinThreshold to control choose BroadcastHashJoin threshold, The default is 10MB), which broadcasts the small table to Executor and places it in a hash table as a lookup table. The join operation can be done using a map operation, avoiding the large shuffle operation in the performance code. BroadcastHashJoin does not support full OUTER join; for right outer join, broadcast left table; for left outer join, left semi join; Left anti join, broadcast right table, for inner Join, broadcast which table is small.

  • SortMergeJoin: SortMergeJoin () shuffle () shuffle () shuffle () shuffle () shuffle () shuffle () shuffle () merge () The price is acceptable.

  • ShuffleHashJoin: that means you don’t sort during shuffle, you put the lookup table in the hash table to do the lookup join, so when do you do a ShuffleHashJoin? Lookup table need to be more than the size of the spark. SQL. AutoBroadcastJoinThreshold values, or you use BroadcastHashJoin, Each partition. The average size of no more than the spark SQL. AutoBroadcastJoinThreshold, such guarantee lookup table can be placed in memory don’t OOM, there is a condition that big table is 3 times bigger than the small table, in order to play the advantages of the Join.

As mentioned above, the nodes above the AST have been converted to physical nodes. These physical nodes eventually recursively call the execute method from the node. The transform operation is called on the RDD generated by the child to produce a chain of RDD strings. Just like the recursive call on DStream in Spark Stremaing. The final figure is as follows:

And you can see that this is finally executed in two stages, taking the small table broeadcastExechage to the large table and doing BroadcastHashJoin, no evolutionary shuffle, and then at the end of the aggregation, HashAggregate sum (HashAggregate sum); Exchage (name) shuffle the data of the same key into the same partition; There’s a WholeStageCodegen, which is kind of weird, because when we execute Filter, Project, these operators, they have a lot of expressions inside them, For example, SELECT sum(v),name, where sum and v are Expression, v is an Attribute variable Expression, and the Expression is also a tree data structure. Sum (v) is a tree structure of the sum node and its child node v. These expressions can be evaluated and generated code. The basic function of an Expression is to evaluate the input Row. InternalRow = NULL): Any function to implement its functionality.

The expression is processed on the Row, and the output can be of any type, but the Plan output of Project and Filter is def output: Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq[Attribute] : Seq One Literal constant expression evaluates to 11, and the other is the Attribute expression age, which is converted to type AttributeReference during the Analyze phase, but is Unevaluable. To get the value of the attribute in the input Row, bind the index of the variable in the Row according to the schema association, and generate a BoundReference, The BoundReference expression then gets the value of the Row based on the index in eval. The output type of the expression age>11 is Boolean, but the output type of the Plan Filter is Seq[Attribute].

Can imagine, the data flow in a plan a, then every expression inside the plan to deal with data, is equivalent to a small function call processing, there are a lot of function call overhead, so we can take these small functions inline, as a big function, That’s what WholeStageCodegen is for.

You can see that the final execution plan has an * in front of each node, indicating that the entire code generation is enabled. In our example, Filter, Project, BroadcastHashJoin, Project, HashAggregate this section is enabled for the entire section of code generation, cascade to two big functions, are interested can use a.q ueryExecution. Debug. Codegen what the code looks like after the next generation. However, the Exchange operator does not implement full code generation because it needs to send data over the network.

There are a lot of interesting things in Spark SQL, but because of the complexity of the problem, it requires a high level of abstraction to sort it out, which makes it difficult for code readers to understand, but if you really look at it, you will get a lot. If you have any thoughts on this article, feel free to leave them in the comments at the end.

People say

The column is devoted to the discovery of technical people’s thoughts, including technical practices, technical dry goods, technical insights, growth experiences, and anything worth discovering. We hope to gather the best technical people to find unique, sharp, with the sense of The Times.

Email address: [email protected]