• In this paper, starting from vivo Internet technology WeChat public mp.weixin.qq.com/s/lqMu6lfk- number…
  • About the author: Zheng Zhibin, graduated from computer Science and Technology (bilingual class) of South China University of Technology. He has been engaged in e-commerce, open platform, mobile browser, recommendation advertising, big data, artificial intelligence and other related development and architecture. At present, I am engaged in AI construction and advertisement recommendation business in Vivo intelligent platform center. Good at business architecture, platform and business solutions of various business forms.

This document describes the hazards, symptoms, and causes of data skew and the solutions for Spark data skew.

What is data skew

For distributed big data systems such as Spark or Hadoop, a large amount of data is not a problem. The problem is data skew.

For distributed systems, the overall application time ideally decreases linearly as the system size (number of nodes) increases. If it takes 120 minutes for one machine to process a large batch of data, the ideal time is 120/3 = 40 minutes when the number of machines increases to 3. However, in order to ensure that the execution time of each machine is 1/N of that of a single machine in the distributed case, it is necessary to ensure that the tasks of each machine are equal. Unfortunately, many times the distribution of tasks is uneven, even to the extent that most of the tasks are distributed to individual machines, with most of the other machines doing only a fraction of the total. For example, one machine handles 80% of the tasks and two other machines handle 10% each.

This is the biggest problem in distributed environment. This means that computing power does not expand linearly, but has a short board effect: the time consumed by a Stage is determined by the slowest Task.

Since all tasks in the same Stage perform the same calculation, the difference in time between different tasks is mainly determined by the amount of data processed by the task on the premise of excluding the difference in computing capability of different computing nodes. Therefore, the problem of data skew must be solved in order to give full play to the advantages of parallel computing in distributed systems.

Second, the harm of data skew

When data skew occurs, the time of a small number of tasks is much higher than that of other tasks, which makes the overall time too large and fails to give full play to the parallel computing advantages of distributed systems.

In addition, when data skew occurs, the amount of data processed by some tasks is too large. As a result, tasks may fail due to insufficient memory and the entire application may fail.

3. The phenomenon of data skew

Nine times out of ten, data skew occurs when:

  • Most tasks execute very fast, but individual tasks execute so slowly that the whole task is stuck at some stage.

  • The Spark job that could run normally suddenly reported OOM (memory overflow) exception one day and observed the exception stack. This is caused by the service code we wrote. This is relatively rare.

TIPS

In Spark Streaming, data skew is more likely to occur, especially when the application contains SQL operations such as join and group. Since Spark Streaming is running with very little memory allocated, any data skew that occurs during this process can easily result in an OOM.

4. Reasons for data skew

During shuffle, the same key on each node must be pulled to a task on a node for processing. For example, the task can be aggregated or joined based on the key. If a key has a large amount of data, data skew occurs. For example, if most of the keys correspond to 10 pieces of data, but individual keys correspond to 1 million pieces of data, most tasks may only be allocated 10 pieces of data and then run in 1 second. But individual tasks may be allocated 1 million data and run for an hour or two.

Therefore, When data skew occurs, Spark jobs may run very slowly or even overflow the memory due to the large amount of data processed by a task.

5. Problem discovery and positioning

1. Use the Spark Web UI

Use the Spark Web UI to check the amount of data allocated by each task of the current stage (Shuffle Read Size/Records) to determine whether data skew is caused by uneven data allocated by tasks.

After knowing which stage the data skewer occurs in, we then need to calculate which part of the code corresponding to the stage where the data skewer occurs according to the stage partition principle. There must be a shuffle operator in this part of the code. You can use countByKey to view the distribution of individual keys.

TIPS

Data skew occurs only during shuffle. The following lists common operators that may trigger shuffle: Distinct, groupByKey, reduceByKey, aggregateByKey, Join, cogroup, and Repartition. When data skew occurs, it may be the result of using one of these operators in your code.

2. Statistics by key

It can also be verified by sampling statistics on the number of occurrences of keys.

Due to the large amount of data, the sampling method can be adopted to sample the data, count the number of occurrences, and take out the first few according to the number of occurrences:

df.select("key").sample(false, 0.1) / / data sampling. (k = > (k, 1)). ReduceBykey + _) (_ / / statistical key. The number of occurrences of the map (k = > (k. _2, k. _1)). SortByKey (falseTake (10) // take the first 10.Copy the code

Data skew occurs if most data are found to be evenly distributed and some data are orders of magnitude larger than others.

Vi. How to alleviate data skew

The basic idea

  • Business logic: We optimized the data skewness from the level of business logic. For example, to count the orders of different cities, we counted the first-tier cities separately, and finally integrated them with other cities.

  • Procedure implementation: For example, in Hive, often encounter count (distinct) operation, this will result in only one reduce, we can first group and then outside the count layer, it is ok; Use reduceByKey to replace groupByKey in Spark.

  • Parameter tuning: Both Hadoop and Spark have built-in parameters and mechanisms to adjust data skew, and use them properly to solve most problems.

1. Filter abnormal data

If the key that causes data skew is abnormal data, simply filter it out.

First, analyze keys to determine which keys cause data skew. The specific method has been introduced above, here is not repeated.

Then analyze the records corresponding to these keys:

  1. Null values or outliers and things like that, mostly because of that

  2. Invalid data, large amounts of duplicate test data or valid data that has little impact on the results

  3. Valid data, normal data distribution caused by business

The solution

In the first and second cases, you simply filter the data.

The third case requires special treatment, which is described below.

Idea 2. Improve shuffle parallelism

When Spark performs Shuffle, it uses the HashPartitioner (non-Hash Shuffle) to partition data by default. If the parallelism is set improperly, a large number of data corresponding to different keys may be allocated to the same Task. As a result, the data processed by this Task is much larger than that of other tasks, resulting in data skew.

If you adjust the parallelism during Shuffle so that different keys assigned to the same Task are allocated to different tasks, the amount of data required to be processed by the original Task is reduced and the short-board effect caused by data skew is alleviated.

(1) Operation process

RDD operation can be directly set on the operator need to Shuffle parallelism or use spark. The default. The parallelism setting. If it is the Spark of SQL, but also through the SET Spark. SQL. Shuffle. Partitions = [num_tasks] sets the parallelism. The default parameters are controlled by different Cluster Managers.

DataFrame and sparkSql can set the spark. SQL. Shuffle. Partitions = [num_tasks] parameter control shuffle concurrency, default is 200.

(2) Application scenarios

A large number of different keys are assigned to the same Task, resulting in a large amount of data in the Task.

(3) Solutions

Adjust parallelism. Generally, it is to increase the degree of parallelism, but sometimes it can be achieved by reducing the degree of parallelism.

(4) Advantages

Simple implementation, only need parameter tuning. The problem can be solved with minimal cost. Generally, if there is data skew, you can use this method to test several times, if the problem is not solved, then try other methods.

(5) Disadvantages

Fewer scenarios, just fewer different keys for each task to execute. There is no solution to skew caused by exceptionally large individual keys. If some keys are very large, even if a task executes them alone, it will suffer from data skew. And this method can only alleviate the data skew, not eliminate the problem completely. From practical experience, the effect is general.

TIPS can be likened to data skewing as a hash conflict. Increasing parallelism is similar to increasing the size of a hash table.

3. Customize the Partitioner

(1) Principle

Use a custom Partitioner (HashPartitioner by default) to assign different keys that were originally assigned to the same Task to different tasks.

For example, we use a custom Partitioner on the groupByKey operator:

.groupByKey(new Partitioner() {
  @Override
  public int numPartitions() {
    return 12;
  }
 
  @Override
  public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
      return (id - 9500000) / 12;
    } else {
      returnid % 12; }}})Copy the code

TIPS This practice is equivalent to customizing the hash function of a hash table.

(2) Application scenarios

A large number of different keys are assigned to the same Task, resulting in a large amount of data in the Task.

(3) Solutions

Use a custom Partitioner implementation class instead of the default HashPartitioner, trying to evenly distribute all the different keys among different tasks.

(4) Advantages

The original parallelism design is not affected. If the degree of parallelism is changed, the degree of parallelism of subsequent stages will also be changed by default, which may affect subsequent stages.

(5) Disadvantages

The application scenario is limited. Different keys can only be scattered. This method is not applicable to the scenario where the data set corresponding to the same Key is very large. The effect is similar to that of parallelism adjustment, which can only alleviate data skew but not completely eliminate it. In addition, the Partitioner needs to be customized according to the data characteristics, which is not flexible enough.

4. Convert the Join on the Reduce side to the Join on the Map side

By using the Broadcast mechanism of Spark, the Join on the Reduce end is converted to the Join on the Map end. This means that Spark does not need to perform shuffle across nodes but directly uses local files to Join. Thus, data skew caused by Shuffle is completely eliminated.

from pyspark.sql.functions import broadcast
result = broadcast(A).join(B, ["join_col"]."left")Copy the code


Where A is the smaller Dataframe and can be stored in executor memory entirely.

(1) Application scenarios

The data set on one side of the Join is small enough to be loaded into the Driver and Broadcast to each Executor.

(2) Solutions

The small cube data is pulled to the Driver in Java/Scala code, and then Broadcast to each Executor using the Broadcast scheme. Or before using SQL, adjust the Broadcast threshold to a large enough value for the Broadcast to take effect. Then replace Reduce Join with Map Join.

(3) Advantages

Shuffle is avoided and data skew is completely eliminated, which greatly improves performance.

(4) Disadvantages

Because small data is sent to each executor through Broadcase first, the data set on the side that needs to participate in the Join is small enough and applies primarily to Join scenarios, not aggregation scenarios, with limited applicability.

NOTES

Need to SET the Spark when using Spark SQL. SQL. AutoBroadcastJoinThreshold = 104857600 will Broadcast the threshold SET large enough, will only take effect.

5. Split a join into a union

Split a join into a slanted dataset join and a non-slanted dataset join, and then union:

  1. For the RDD that contains a few keys with too much data (suppose leftRDD), a sample is sampled through the sample operator, and then the number of each key is counted to calculate which keys have the largest amount of data. The specific method has been introduced above, here is not repeated.

  2. Then filter the data corresponding to the K keys from the leftRDD separately, and prefix each key with a random number within 1~ N to form a separate leftSkewRDD. Most keys that do not cause skew form another leftUnSkewRDD.

  3. Then, the other rightRDD that needs to join is filtered out with those slanting keys, and each data in the data set is converted into N data through the flatMap operation (all the N data are added with a prefix of 0~ N in order), forming a separate rightSkewRDD. Most keys that do not cause skew also form another rightUnSkewRDD.

  4. Now, leftSkewRDD and rightSkewRDD with n-fold expansion are joined, and the random prefix is removed in the process of join to get skewedJoinRDD, the join result of slanted data set. Notice that we have successfully split the same key into N parts and split them into multiple tasks for join.

  5. Join leftUnSkewRDD and rightUnRDD to get the Join result unskewedJoinRDD.

  6. SkewedJoinRDD and unskewedJoinRDD are combined through the union operator, so as to obtain a complete Join result set.

TIPS

  1. Part of the data corresponding to rightRDD and the tilt Key must be multiplied by a random prefix set (1-N) (that is, the amount of data is increased by n times) to ensure that the data tilt Key can be joined normally no matter how the data tilt Key is prefixed.
  2. The join parallelism of skewRDD can be set to N x K (k is the number of TopSkewkeys).
  3. The operations on slanted keys and non-slanted keys are independent and can be performed in parallel.

(1) Application scenarios

The two tables are too large to use map-side Join. In one RDD, the data volume of a few keys is too large, and the keys of the other RDD are evenly distributed.

(2) Solutions

The data set corresponding to the skew Key in the RDD with data skew is separately extracted and added with the random prefix. In the other RDD, each data is combined with the random prefix respectively to form a new RDD (equivalent to increasing its data to N times of the original, where N is the total number of random prefixes), and then Join the two and remove the prefix. The remaining data that does not contain the slanted Key is then joined. Finally, all Join results can be obtained by combining the result sets of the two joins through union.

(3) Advantages

Compared with Map, Join is more suitable for Join of large data sets. If the resources are sufficient, the inclined part data set and the non-inclined part data set can be carried out in parallel, and the efficiency is obviously improved. In addition, data expansion is performed only for skewed data, resulting in limited resource consumption.

(4) Disadvantages

If there are too many skew keys, the data inflation on the other side is very large, and this scheme is not applicable. In addition, the tilted Key and non-tilted Key are processed separately, and the data set needs to be scanned twice, which increases the overhead.

Add salt to the large table key, and enlarge the small table by N times jion

If there are a lot of data skew keys, it is not meaningful to separate these large numbers of skew keys in the previous method. In this case, it is more suitable to add random prefixes to all data sets with data skew directly, and then perform cartesian product (i.e., increase the data volume by N times) with the whole data set without serious data skew.

It’s a special case or a simplification of the previous method. Without splitting, there would be no union.

(1) Application scenarios

One dataset has a large number of skew keys, while the other one is evenly distributed.

(2) Advantages

It works well in most scenarios.

(3) Disadvantages

The need to scale a data set by a factor of N increases resource consumption.

Idea 7. Local aggregation is performed on the Map end

Add a Combiner function to the map end for local aggregation. If combiner is used, the same key in a Mapper is aggregated, reducing the amount of data during shuffle and the amount of computation on the Reduce side. This method can effectively alleviate the data skew problem, but it is not very effective if the key that causes the data skew is distributed in a large number of different Mapper.

TIPS use reduceByKey instead of groupByKey.

8. Salt-added local polymerization + desalted global polymerization

The core idea of this scheme is to carry out two-stage aggregation. The first one is local aggregation. Each key is typed with a random number from 1 to N, such as a random number within 3. In this case, the original same key will become different. Like (Hello, 1) (Hello, 1) (Hello, 1) (Hello, 1) (Hello, 1) (hello, 1), It will become (1_Hello, 1) (3_hello, 1) (2_hello, 1) (1_Hello, 1) (2_hello, 1). Then type the data after random number, perform reduceByKey and other aggregation operations to perform local aggregation, so the local aggregation result will become (1_hello, 2) (2_hello, 2) (3_hello, 1). (Hello, 2) (Hello, 2) (hello, 1), and then perform global aggregation again to get the final result, such as (hello, 5).

def antiSkew(): RDD[(String, Int)] = {
    val SPLIT = "-"
    val prefix = new Random().nextInt(10)
    pairs.map(t => ( prefix + SPLIT + t._1, 1))
        .reduceByKey((v1, v2) => v1 + v2)
        .map(t => (t._1.split(SPLIT)(1), t2._2))
        .reduceByKey((v1, v2) => v1 + v2)
}Copy the code


However, the performance of two MapReduce runs is slightly worse than one run.

Data skew in Hadoop

Hadoop is directly close to the user using Mapreduce programs and Hive programs, although Hive is finally using MR to execute (at least currently Hive memory computing is not popular), but after all, write content logic is very different, one is a program, one is Sql, so here is a slight distinction.

The data skew in Hadoop is stuck at 99.99% during the Ruduce phase and never ends at 99.99%.

Here, if you look at the log or monitor interface in detail, you will find:

  • There is one more reduce stuck

  • Error OOM for various Containers

  • A large amount of data is read and written, at least much more than other normal Reduce operations

  • With data skew, tasks are killed and other weird behaviors occur.

Experience: Hive data skew usually occurs in Sql groups and On, and is deeply bound to data logic.

An optimization method

Here is a list of some methods and ideas, specific parameters and usage in the official website to see the line.

  1. Map the join way

  2. Count Distinct operations are changed to group and then count

  3. Parameter tuning

    set hive.map.aggr=true

    set hive.groupby.skewindata=true

  4. Use of left Semi Jion

  5. Set map-side output and intermediate result compression. (Not completely to solve the problem of data skew, but reduce IO read and write and network transmission, can improve a lot of efficiency)

instructions

Hive.map. aggr=true: Partial aggregation is performed in a map, which is more efficient but requires more memory.

Hive.groupby. skewinData =true: load balancing indata skew mode. If this parameter is set to true, the generated query plan contains two MRJobs. In the first MRJob, the output result set of Map is randomly distributed to Reduce tasks. Each Reduce task performs partial aggregation operations and outputs the result. In this way, the same GroupBy Key may be distributed to different Reduce tasks to achieve load balancing. The second MRJob is then distributed to Reduce by GroupBy Key based on the preprocessed data (this process ensures that the same GroupBy Key is distributed to the same Reduce), and the final aggregation operation is completed.

8. Refer to the article

  1. Spark performance optimization: Solve N Data Skew poses of Spark

  2. Data Skew (Pure Dry Goods)

  3. Resolve the data skew problem in Spark

For more content, please pay attention to vivo Internet technology wechat public account

Note: To reprint the article, please contact our wechat account: LABs2020.