In general, when we write SQL, we usually use Join operator to associate tables, and this aspect of the query is generally the most common. The following is only a brief description of the Join process and relevant optimization methods

Spark SQL related theories

As shown in the figure below, before analyzing the specific execution of different types of Join, we first introduce the basic framework of Join execution. Some concepts and definitions in the framework are used in different SQL scenarios. In Spark SQL, the implementation of a Join is based on a basic flow. The two tables that participate in a Join are called "flow table" and "build table" respectively according to their roles. The roles of different tables are set according to certain policies in Spark SQL. Set the small table to build table *. The iterator for the stream table is StreamIter, and the iterator for the build table is BuildIter. The result of each Build operation is A JoinedRow(A,B), where A comes from StreamIter,B comes from BuildIter, This procedure is the Builder operation, and if B comes from StreamIter and A comes from Builder, it is the buildLeft operationCopy the code

For the LeftOuter RightOuter, LeftSemi RightSemi, they build type is certain, LeftOuter,LeftSemi for BuildRight,RightOuter,RightSemi for BuildLeft

Join in specific implementation level, Spark SQl provides BoradcastJoinExec ShuffleHashJoinExec and SortMergeJoinExec three mechanisms.

SortMergeJoinExec the most common Join method SortMergeJoinExec

SortMergeJoinExec is the main implementation of Join query. In Hash series of Join implementations, data on one side is fully loaded into memory, which is suitable for tables of a certain size. However, when two tables have very large amounts of data, whichever method is used will cause great pressure on computational memory. In this case, Spark uses SortMergeExec to perform Join operations. Its physical execution plan and final execution plan are shown below:

select name,score from student join exam on student.id = exam_student_id
Copy the code

The implementation of SortMergeJoin does not need to carry out Join operation after loading all the data on one side. The prerequisite is to sort the data before Join operation. In order to link the two records together, records with the same Key need to be distributed to the same partition. Therefore, a Shuffle operation (that is, an Exchange node in a physical execution plan) is performed to distribute the connected records to the same partition according to the Key partition. In this way, records with the same Key in the two tables can be assigned to the same partition for processing in the subsequent Shuffle phase.

After the ExChange node operation, the data in each partition of the two tables is sorted by key (SortExec node in Figure 1), and then sort on this basis. In traversing the streaming table, for each record, the corresponding record is found in the build lookup table using a sequential lookup. SortMergeJoinExec can avoid a lot of useless operations and improve performance due to the following principles:

For the core SortMergeScanner class that looks for data matches, the StreamedTable iterator and the BufferTable iterator are passed in the SortMergeJoinScanner construction arguments because they are already sorted, So you just need to keep using the moving iterator to get new data for comparison

  • Performance optimization for SortMergeExec

    • Preliminary sorting the Join

    Before Shuffle, data is repartitioned based on the hash value of the key in the Map phase. The same key is assigned to the same partition, and the data of the same partition in different Mapper is shuffled to the same Reducer. The ReDucer sorts data from different Mapper and then joins the sorted data

The difference of this mechanism is that when the number of Reducer is small, the amount of data processed by Reducer will be large. Recuder only needs to merge and sort the data from different mappers. MergeSpill merges all the small files in insertRecord In addition, each time a minimum value is removed from the spilled file and written to the spilled partition. If there is no data from the spilled partition, it is changed to the next partition until all the data is removed

def joinShuffleWrite(Iterator<Product2<K,V>> records){ while(records.hasNext()) sorter.insertRecord(record.next()) end while mergeSpills() } def insertRecord(Object record){ if(meomryBuffer.size() >= threshold){ sortAndSpill(meomoryBuffer)  } //TODO add record to memory } def mergeSpills(){ while( currentPartitionId! =null){ if(record! =null){ //TODO wirte record to output file }else{ if(has next Partition){ currentPartitionId = next Partition }else{ currentPartitionId = null } } } }Copy the code
  • The multi-dimensional analysis of the community adopts the mode of Expand to read data at a time, and each data read will generate multiple pieces (2^n, where n is the dimension size).

    select A,B,sum(C) from myTable group by A,B with cube
    Copy the code

    When the dimensions are large or the initial data amount is large, Shuffle performance is very poor. In some cases, oom problems may occur when resources are limited. Therefore, Spark 19175 is used as an example. The following optimizations are proposed

/**
 * Splits [[Aggregate]] on [[Expand]], which has large number of projections,
 * into various [[Aggregate]]s.
 */
object SplitAggregateWithExpand extends Rule[LogicalPlan] {
  /**
   * Split [[Expand]] operator to a number of [[Expand]] operators
   */
  private def splitExpand(expand: Expand): Seq[Expand] = {
    val len = expand.projections.length
    val allProjections = expand.projections
    Seq.tabulate(len)(
      i => Expand(Seq(allProjections(i)), expand.output, expand.child)
    )
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
      if (SQLConf.get.groupingWithUnion && projections.length > 1) {
        val expands = splitExpand(e)
        val aggregates: Seq[Aggregate] = Seq.tabulate(expands.length)(
          i => Aggregate(a.groupingExpressions, a.aggregateExpressions, expands(i))
        )
        Union(aggregates)
      } else {
        a
      }
  }
}
Copy the code

Tune the service configuration

Spark-submit parameter tuning
Reference: https://help.aliyun.com/document_detail/28124.htmlCopy the code
Small file merge

When Spark SQL sends data in and out of Hive tables, the number of files is the same as that of Reduce. If the Reducer number is large, this mechanism causes excessive small files. When Hive data consisting of a large number of small files is read, the opening and closing of files makes Spark SQL inefficient.

- only merge fragments files If set pieces of the threshold is 128 m, so only the table/partition is less than the threshold value of the file to merge, at the same time if the fragment file number is less than a certain threshold, will not trigger the merger, the main consideration is to merge task there is a performance overhead, and therefore allows the system to exist a certain amount of small files. Get the maximum concurrency of a task. MaxConcurrency is used to calculate the size of a data block. Then, set the coalesce/repartition mode according to the total size of the data fragment file. MaxConcurrency calculation: https://developer.aliyun.com/article/740597 a. Open dynamicAllocation maxConcurrency = spark. DynamicAllocation. MaxExecutors * spark executor. Cores b. DynamicAllocation maxConcurrency = spark.executor.instances * spark.executor.cores Scenario 1: If coalesce(1) is used, only one thread reads/writes data concurrently. If repartition(1) is used, 100 concurrent reads are performed and one thread writes data sequentially. Performance difference of 100X. Scenario 2: The maximum concurrency is 100, the number of fragmented files is 10,000, and the total size of fragmented files is 100 GB. If repartition(200) is used, 100 GB of data will be shuffled and changed to coalesce(200). The IO of 200GB data can be avoided while maintaining the same concurrency. Scenario 3: The maximum concurrency is 200, the number of fragmented files is 10,000, and the total size of fragmented files is 50 GB. If coalesce(100) is used, 100 500 MB files are saved, but half of the computing performance is wasted. Change to coalesce(200), and the merger time is reduced by 50%.Copy the code
Enable Executor dynamic adjustment

In opens the Executor dynamic adjustment (spark) dymaicAllcation) enabled = true), can let users from tedious Executor forecasts. In addition you can also open the spark. Shuffle. Service. Enabled, used to keep the shuffle file has been removed Executor, and retain the shuffle files can be used in the recovery Stage failure

SparkSQl business-level tuning (From Spark SQL Kernel Anatomy)
Adjust the Join order of multiple tables

The multi-table Join operation is very common in practical services. It is mainly used to consolidate information from multiple tables for further analysis. In multi-table Join scenarios, the order of data tables has a significant impact on performance. In fact, multi-table joins have always been the key target of cost-based Optim ization (CBO) in database.

Note The CBO in Spark SQL is not mature enough to intelligently adjust the order of Join in SQL. Therefore, you need to check the service level when writing SQL. As shown in Figure 11.17, the user needs to Join data tables A (3 billion), B (12 billion), and C (100 million) through A Join operation. Table A joins table B and then joins table C. It was observed that there were 2.5 billion pieces of data in table A and table B after the Join operation. Therefore, the 2.5 billion pieces of data were finally joined with table C with 100 million pieces of data.

After the test, if table A and table C are joined first, as shown in Figure 11.18, the intermediate result of Join is only 50 million pieces of data, and then Join with table B again. This method can greatly reduce the amount of data involved in Shuffl e, improve the execution speed of Join, and improve the performance by about 40%. Generally speaking, when multiple tables perform Join operations, there is always an optimal execution order. Adjustment at the business level requires a general understanding of data distribution.

Data reuse

“Reuse” is a common optimization method in data processing system, which mainly includes data reuse and computing reuse.

  • Computing reuse refers to reusing the same operation logic to reduce CPU computing costs.

  • Data reuse is easy to understand. If two tasks that read the same data do not depend on each other, you can combine the task logic to read data only once and reduce THE I/O cost.

In business, there are many data reuse scenarios, most of which are dominated by Union operations. For example, the same SQL statement template, the user program input parameters, each SQL statement after the execution of a result, the results are merged. In addition, even within a single SQL statement, the user’s SQL statement may be divided into multiple sub-query logic with Union. For example, the simple SQL script is as follows. You need to select the data whose key is 20170802 and 20170803 respectively for further processing (note: Complex processing logic is omitted here and only value is used instead). When the SQL statement is executed in Spark SQL, the data table myTable is read twice. When the data volume of myTable is large, the performance is greatly affected.

The above SQL statements can be tailored at the business level to be written in the following way, integrating logic to filter data. For further complex operations, the caseWHEN statement can be supported. In this way, Spark SQL only needs to read data table myTable once, reducing I/O cost. In particular, in the architecture where storage and computing are separated, Spark SQL reduces network usage during data transmission.

Currently, Spark SQL lacks the logic for these optimizations at the system level, so more tuning and optimizations are needed at the business level. In fact, a lot of work has been done on various scenarios of reuse technologies, including MRShare and Stubby in MapReduce environments, which can be used in Spark SQL.

The Window function performs performance tuning

Generally, if the Task execution of a node in the Spark cluster is very slow or OOM is displayed, you can increase the memory of the node to improve the execution efficiency. However, in practical application, there is still a kind of special case. For example, if a data analysis service needs to count the first several users in each area, the SQL statement involves the Window function, such as row_number.

During Spark execution, it is found that more than 100 million pieces of data exist on a partition, and the total amount of data is 6.5GB. Within a certain range (note: TDW is set to 10GB or less), no matter how much you increase the Executor’s memory, the Task for that partition will eventually run out of memory, causing the SQL to fail. In fact, the problem has to do with how memory is used during Shuffl reads and writes.

To review, the Shuffle process has a threshold. When the threshold is exceeded, the data files are sorted and migrated to the disk. As shown in the figure, assuming that threshold is 4, the data is written into 4 files accordingly. In the Shuffl e read phase, one reader is constructed for each data file (see UnsafeSorterSpillReader),

Each reader requests 1MB of memory as a buffer. The problem lies here. The default threshold of data structure in Windows Exec execution plan is 4096, so 100 million pieces of data will generate more than 24,000 files, requiring about 24GB of memory space, leading to the problem of memory overflow. Solution is simpler, the parameter spark. SQL. WindowExec. Buffer. Spill. Threshold adjustment directly. The actual heap size of the Executor JVM is M(GB), the number of pieces of data to process is N, and the size is size(GB). The following two constraints should be roughly met: (1) No more than Executor memory, that is, size∗threshold/n

To some extent, these cases can be generalized into a broad category. Although Spark has a unified memory management function, MemoryManager can effectively manage a series of Memoryconsumers. However, there are still some objects that are in the “black” state and do not implement the MemoryConsumer interface. To solve these problems, you need to understand the execution principle of Spark.

Data skew processing

Data skew has always been a pain point in distributed environments. However, according to the “80/20 principle”, data in actual business is generally difficult to evenly distribute,

For example, the number of users with a certain ID is very frequent or the number of users who log in to the system in a certain period is very large. In the SQL statement, data skews are mainly Aggregation and Join. Aggregation has a Partial mechanism, so the problem is not obvious.

Here take Join as an example. When the data volume of one or some keys is much larger than other keys, the running time of the tasks processing these keys is much longer than that of the tasks processing other keys, thus slowing down the execution time of the whole Join. In the process of TDW platform construction, data skew is also a kind of problem frequently encountered, which is generally optimized or avoided by the following methods at the business level.

  • Irrelevant data filtering:

    Observations by operational and peacekeeping developers show that approximately 50% of the actual data skew is caused by non-business data, which can be divided into two scenarios:

    • A large number of NULL data are not filtered and participate in the execution of Join (note: NULL is not skipped according to business requirements in TDW).

    • Dirty data does not meet the original data type, and the same result, such as NULL, is often obtained after internal logical processing.

    The data skew caused by these two situations is relatively simple to deal with. You can filter these irrelevant data directly after troubleshooting.

    • Broadcast of small tables: If the two tables that participate in the Join operation are BroadcastJoin tables, BroadcastJoin can be used to broadcast small tables to the Executor of large tables to avoid data skew. Since Spark 2.2, Hint can be added to SQL to force BroadcastJoin to be used. For example, the following SQL statement broadcasts the small table T1 to the Executor where the large table T2 resides. This approach deals with a limited number of cases.

      Note that the base table cannot be broadcast for an outer join, so the left table in a left outer join cannot be a small table, and the right table in a right outer join cannot be a small table.

Skew data separation

For example, the two tables that participate in the Join operation are T1 and T2 respectively, and the table with data skew is T1.

T1 data can be divided into two parts, T11 and T12. T11 does not contain data skew, and T12 only contains data skew. Data tables T11 and T12 are joined with T2 respectively, and then the results are merged. The corresponding SQL is written as follows.

  • The Join operation of T11 and T2 does not have the problem of data skew.
  • Since table T12 is usually not very large, the Join operations of t12 and T2 can use the second method to implement BroadcastJoin. After this processing, the Join operations on tables T1 and T2 can handle data skew.

Data scattered

The main idea is to disperse the skew data. Take a simple example, as shown here.

Assume that both tables A and B have ID and value fields. Join the two tables according to their IDS, that is, A.id= B.ID. In this case, because the ID is a, all the data will be associated in the same task, resulting in data skew.

In the case of large data volume, this task will slow down the execution efficiency of the entire application. The processing method of data shash is to add the suffix (” ID_0 “-” ID_2 “) to the ID in the large table (A) to “shash”. For the result to be correct, the ids in small table B need to “copy” each data item multiple times. At this point, the join operation will generate three tasks, and each task only needs to associate one data, which plays a decentralized role. For SQL writing, the code for each table is as follows.

After processing, new_id is used as the aggregation condition. Note that the effect of the RAND function here is not necessarily uniform, and the number of suffixes can be weighed step by step based on the actual business data. When dealing with data skew, it is necessary to use count(∗) first to look at the steps of the data. In addition, you can implement a UDF function that specifically generates arrays from 0 to n for easy suffixing.