Data skew is the thorniest problem in massive data processing. This article will share the team’s experience of skew tuning in the trillion-scale data. Through the in-depth analysis of the actual production practice of the data skew generation scene and the in-depth analysis of the principle behind the data skew, this paper will give a simple, practical and efficient tuning scheme. Meanwhile, targeted solutions will be proposed for GroupBy and Join, two basic operators in big data processing.

 

First, give priority to analyzing data characteristics

Before introducing data skew, I will emphasize a key but often overlooked problem in big data processing — analyzing data characteristics. For big data processing tasks, many people always ignore the key step of analyzing the characteristics of the original data and start coding directly. As a result, the code is often written quickly, but it takes a huge amount of time to get the right result within an acceptable time. One of the most important lessons learned from mass data processing operations is that analyzing data must precede processing. The analysis data includes but is not limited to the following points:

1) Analyze the overall size of the data

For example, develop a data processing task with 8T data. So before processing the data, we need to analyze the number of original files of the 8T data and the size of each file. If the number of original data files and file size are not within a reasonable threshold, the 8 TB data should be processed into a relatively standard number of files and file size. Estimate the number of Map and Reduce tasks for Map/Reduce tasks and the number of tasks for each Job for Spark tasks. Based on this, you can roughly determine the resource cost of the task.

2) Analyze the storage format of data

Data processing performance is closely related to the format of the data stored. Column storage is a good choice for data processing tasks involving only a small number of columns, and row storage may be a better choice for data processing tasks involving a large number of columns at once. For the data processing task with 8T data volume mentioned above, the performance of reading a single part in different storage formats such as Parquet, GZ and SNAPPY can be tested in advance. For the data processing task with sparse values but requiring a large number of columns in data processing, uncompressed data often achieves better reading speed.

3) Analyze the distribution of data column values

Analyzing data scale and data storage format is mainly to optimize the performance of load original data in the whole task. However, the most critical point in actual data processing is to analyze the distribution of data column values, including the number of rows with values in each column and the distribution of the number of rows with values in each column. Analyzing the column value distribution can be done by sampling from the full data.

Val sampledPairs = pairs. The sample (0.1) is false,

val  sampledWordCounts = sampledPairs.countByKey()

sampledWordCounts.foreach(println(_))

GroupBy and Join are the most commonly used operators in big data processing. Only after knowing the distribution of values of each column can the possible running performance be clearly known when using GroupBy and Join. The data skew optimization that will be introduced later in this paper depends on the distribution results of data column values in this step.


2. Data skew judgment conditions

 

For a certain data processing task, most tasks are executed within a reasonable time, but some tasks are executed very slowly, which is called data skew. For example, out of a total of 10,000 tasks, 9997 tasks were executed within 3 minutes, but the remaining 3 tasks took an hour or two to complete or could not be executed. On the Task monitoring page of Hadoop and Spark, the execution time of each Task can be viewed. When the above phenomenon is observed, data skew occurs, and code optimization is required. This situation is very common in big data processing. The calculation time of tilted task often directly determines the time cost of the whole task.

 

The nature of this phenomenon is that individual tasks process much more data than other tasks. Because each task has the same resources, processing tasks with a large amount of data takes much longer than other tasks.

 


3. Analysis of map task data skew principle

 

For Map/Reduce jobs, data skew usually occurs in the Reduce phase, and will be analyzed in the following sections. However, data skew also occurs in the Map process.

 

There is only one reason for data skew during the map process. The map task reads original files that do not support SplitTable and the size of original files is uneven. Some files are very large. For example, if part of the file size is 2G and part of the file size is 2K, assuming that the data processing time is proportional to the file size, then the task processing time of 2G file size is 1 million times that of 2K file.

 

Splittable is explained here. In the HDFS distributed storage system, splitTable indicates whether a file can be read by multiple maps at the same time. Each map reads part of the file data. A file that supports splitTable and consists of N blocks can be processed by N map tasks at the same time. Each map processes one block of data. Therefore, no matter the size of a single file, splitTable files do not cause data skew.

For files that do not support SplitTable, all data in a single file can be processed by only one map. Therefore, for a file consisting of N blocks, if N is large, data skew will occur.

Note that the file does not support splitTable does not mean that the map needs to read the entire file for subsequent processing. During the processing, the map also reads the file in the form of a small batch.

 

Splittable is supported by various file formats in the Hadoop ecosystem as shown in the following figure:

Therefore, if file formats do not support SplitTable and file sizes are inconsistent during data processing, pay attention to whether data skew occurs in the Map phase.

 


4. Analysis of the data skew principle of Reduce jobs

 

Shuffle indicates that data is processed from map to Reduce. In shuffle mode, the output of the Map phase is mapped to the input of reduce based on the Hash algorithm. All the output of a map is distributed to multiple Reduces based on the Hash algorithm. In a shuffle mode, the same key on each task node must be pulled to a task for processing. In Spark, the operators that trigger the shuffle operation include distinct, groupByKey, reduceByKey, aggregateByKey, Join, cogroup, and repartition.

 

The cause of data skew in the Reduce phase is obvious, that is, uneven shuffle. Due to uneven shuffle keys, some tasks process more data than other tasks, resulting in data skew. For example, if most of the keys correspond to 10,000 pieces of data, but some of the keys correspond to 10 million pieces of data, most tasks may only be allocated 10,000 pieces of data and then run in a second. But individual tasks may be allocated 10 million data and run for tens of minutes. Thus, the progress of the entire job is determined by the task that has been running the longest.

 

So why uneven shuffle? A large amount of data in the actual production environment is uneven, for example, the amount of advertisement exposure of Tencent Video in various countries on a certain day will be counted, and there will be a calculation step of Group by Country. Since the domestic traffic of Tencent Video is much larger than that of foreign traffic, when group by Country is implemented, There must be a Reduce task that needs to process all the data of domestic traffic. The scale of domestic traffic data reaches tens of billions, and the execution of the task is very slow and OOM may occur.

 

Rule of Thumb in big data development — to avoid data shuffling as much as possible. That’s because data shuffling is expensive. Take The Spark task as an example. The Calculation of The Spark task is usually done in memory. Due to the data localization policy, most data can be obtained in the same node or rack, and data need not be read from other nodes or rack. However, if shuffle operation occurs, a large amount of DISK I/O and CPU costs will be incurred due to network transmission and data serialization/deserialization, resulting in a huge performance loss. There are two main ideas to reduce shuffle overhead:

1) Reduce shuffle times and process data locally

2) Reduce the data scale of shuffle

 


5. Data skew scenario of Reduce jobs

 

For a given unknown data, if the characteristics of the data are analyzed in advance, it is easy to find all the keys that may produce data skew. This is also the “data characteristics should be analyzed first” in the processing of big data tasks emphasized in the beginning of this paper. So what are the scenarios where keys might tilt in a real production environment? According to our experience, there are two main categories of scenarios:

 

1) There is a default service filling value

For example, the user’s IMEI is filled with the default value when it cannot be obtained. For example, when the advertising system cannot request the advertisement, it plays the default advertisement, and the order number of the default advertisement is the same. For example, a business field currently has only one optional value.

2) Hot spots exist in the business itself

For example, the amount of advertising exposure of popular dramas will be significantly greater than that of ordinary dramas. The amount of exposure of this type of advertising space will be significantly larger than other types of advertising space. Domestic advertising exposure is significantly greater than that of foreign countries.

3) Malicious data exists

As an ID brush a massive amount of advertising exposure.


6. Reduce job data skew solution

 

There are essentially only two operators that can produce data skew, namely GroupBy and Join. Concrete solutions to data skew of these two operators are provided below. It should be noted that the solutions discussed below address data skew at its root rather than “mitigating” it (for example, simply increasing the number of Reduces).

1) The skew of GroupBy operator

 

The skew of the GroupBy operator essentially has only one solution — two-stage aggregation.

 

Phase 1: Modify the aggregation key to perform local aggregation

 

Converts GorupBy(key) to GroupBy(key, randomNum), where randomNum is a random number whose value range is set to (0, N), where N is determined by the key skew program. If a Reduce process needs to process 100 million pieces of data, and processing 1 million pieces of data can be completed in 10 seconds, set N to 100 so that the data that originally had one Reduce process will have 100 Reduce processes.

After local aggregation is complete, the output format of stage 1 is :(key, aggregateValue)

Phase 2: Global aggregation by original key

 

After the aggregation in phase 1, the data size of the key in phase 2 has been reduced to 1/N. At this point, GorupBy(key) can be successfully executed.

 

2) Inclination of Join operator

 

The inclination of Join operator can be divided into two categories:

A) Join a large table

B) Join a small table with a large table

 

A) Join small tables on large tables

 

Map Join is preferred for small tables. The implementation idea of MAPJoin is to place small tables into map memory in the MAP stage and complete the Join by lookup. The implementation of MapJoin in Spark is broadcast Join. Broadcastjoin is implemented by storing a copy of the broadcastJoin variable in each executor. The broadcastJoin variable is implemented as follows:

There are several points to emphasize when using Broadcast Join:

 

1) Explicit BroadcastJoin VS implicit Broadcast Join

Explicit broadcast Join indicates the explicit broadcast small table during the join process, for example: Val resultM = resultl. join(Broadcast (confReflueIDDF),Seq(“reflue_id”),”leftouter”). Broadcast is the hint of an explicit broadcast Join. Spark has two broadcast functions, one in SparkContext and the other in spark.sql.functions. The explicit broadcast join uses the broadcast in spark.sql.functions.

 

Implicit broadcast join refers to in the process of the join spark. The size is smaller than the spark SQL. AutoBroadcastJoinThreshold small tables, automatically broadcast, Spark. SQL. AutoBroadcastJoinThreshold default is 10485760 bytes,

Through the spark. Conf. Get (” spark. SQL. AutoBroadcastJoinThreshold “). The implicit broadcast join is as follows:

val df1 = spark.range(1000)

val df2 = spark.range(1000)

df1.join(df2, Seq(“id”)).explain

You can see from the execution plan that the implementation of the JOIN is BroadcastHashJoin

 

2) Broadcast Join Memory configuration and memory expansion

Broadcast A variable with a size of N bytes requires at least 2N memory space. This is determined by how the broadcast variable is implemented in Spark. In Spark, the driver divides a large broadcast variable into small broadcast pieces. These small broadcast pieces are broadcast to each executor using BitTorrent. The driver stores the original broadcast variable in memory until the entire large broadcast variable is partitioned, requiring twice the size of the broadcast variable.

 

Another thing to note about the broadcast variable is that it can be two to five times larger than the size of the HDFS variable. For example, a file whose original size is more than 1 GB in the HDFS is changed to 5.5 GB in the broadcast process.

 

 

 

3) Broadcast large small table

 

If spark uses a large small table, such as 1 to 5G, and there are many executors in the application, the entire process of completing the broadcast small table may take several minutes. Through the spark in the spark. SQL. BroadcastTimeout to control the timeout of broadcast, the default is 300 s, more than 300 s, real time in the business may then can increase the spark. SQL. BroadcastTimeout. The following figure shows the elapsed time for each phase of the broadcast variable.

 

 

B) Join a large table

 

Suppose BigTable A joins BigTable B. If BigTableA contains a large number of duplicate keys, BigTable B contains a large number of duplicate keys.

 

The solution is as follows:

Divide BigTable B into two tables B1 and B2, where B1 contains only K and B2 contains all other keys except K.

Reducejoin B1 and BigTableA to get join result 1, shufflejoin B2 and BigTableA to get join result 2, Union joinResult 1 and Join result 2 to get the final result.

 

Suppose BigTable A joins BigTable B. If BigTableA contains a large number of duplicate key Ks, BigTable B also has a large number of duplicate key Ks.

 

The solution is as follows:

Divide BigTable A into two tables A1 and A2, where A1 contains only K and A2 contains all other keys except K.

Divide BigTable B into two tables B1 and B2, where B1 contains only K and B2 contains all other keys except K. Join result 1 by reduce join A1 and B1, joinresult 2 by reduce join A1 and B2, joinresult 3 by reduce join A2 and B1. Shuffle join A2 and B2 to obtain Join result 4. Join result 1, join result 2, join result 3 and join result 4 to union to get the final result.

 


This paper summarizes and shares the team’s practical experience in dealing with data skew scenarios. If you have any questions, please leave a message. Thank you for reading.

 

Ref:

https://spark.apache.org/docs/latest/configuration.html

https://spark-summit.org/east-2016

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/

https://github.com/JerryLead/SparkInternals

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals

https://databricks.com/session/deep-dive-apache-spark-memory-management

https://databricks.com/session/tuning-apache-spark-for-large-scale-workloads