Contents of this article:

An overview, tuning Second, the data skew in effect at the time of the phenomenon Third, the principle of data skew generation Four, five, how to position leads to data skew code a task to perform special slow Six, a task puzzling out of memory Seven, view leads to the key data distribution of the data skew Eight, the solution of data skew:

  • Solution 1: Use Hive ETL to preprocess data
  • Solution 2: Filter out the few keys that cause skew
  • Solution 3: Increase the parallelism of shuffle operations
  • Solution 4: Two-stage polymerization (local polymerization + global polymerization)
  • Solution 5: Change reduce Join to Map Join
  • Solution 6: Sample the slanted key and split the join operation
  • Solution 7: Use random prefixes and RDD expansion to join
  • Solution 8: Use multiple solutions together

Overview of tuning

In some cases, we may encounter one of the thorniest problems in big data computing — data skew — and Spark job performance will be much worse than expected. Data skew tuning uses various technical solutions to solve different types of data skew problems to ensure Spark job performance.

2. The phenomenon when data skew occurs

Most tasks execute very fast, but a few execute extremely slowly. For example, out of a total of 1000 tasks, 997 tasks are executed in 1 minute, but the remaining two or three tasks take an hour or two. This is very common. The Spark job that could run normally suddenly reported OOM (memory overflow) exception one day and observed the exception stack. This is caused by the service code we wrote. This is relatively rare.

3. The principle of data skew

The principle of data skewling is simple: During shuffle, the same key on each node must be pulled to a task on a node for processing, for example, aggregation or join based on the key. If a key has a large amount of data, data skew occurs. For example, if most of the keys correspond to 10 pieces of data, but individual keys correspond to 1 million pieces of data, most tasks may only be allocated 10 pieces of data and then run in 1 second. But individual tasks may be allocated 1 million data and run for an hour or two. Therefore, the progress of the entire Spark job is determined by the task that takes the longest to run.

Therefore, When data skew occurs, Spark jobs may run very slowly or even overflow the memory due to the large amount of data processed by a task.

The following figure is a clear example: The hello key corresponds to a total of 7 pieces of data on the three nodes, all of which will be pulled to the same task for processing; The world and you keys correspond to only one piece of data, so the other two tasks only need to process one piece of data. In this case, the running time of the first task may be 7 times that of the other two tasks, and the running speed of the whole stage is also determined by the slowest task.

How to locate the code that causes data skew

Data skew occurs only during shuffle. The following lists common operators that may trigger shuffle: Distinct, groupByKey, reduceByKey, aggregateByKey, Join, cogroup, and Repartition. When data skew occurs, it may be the result of using one of these operators in your code.

5. When a task is executed very slowly

The first thing to look at is the stage in which the skew occurs. If the submission mode is yarn-client, you can directly view the log on the local PC. You can view the current stage in the log and view the Hive data skew previously written. Locate, troubleshoot, and resolve the Hive data skew problem

If yarn-cluster mode is used, you can use the Spark Web UI to check the current stage.

In addition, no matter in yarn-client or yarn-cluster mode, you can view the amount of data allocated by each task in the current stage on the Spark WEBUI to determine whether uneven data allocated by tasks causes data skew.

For example, in the figure below, the third to last column shows the elapsed time of each task. Obviously, some tasks run very fast, taking only a few seconds to complete. Some tasks run very slowly and take several minutes to complete. In this case, data skew can be determined from the running time alone. In addition, the first to last column shows the amount of data processed by each task. Obviously, a very short task can process only a few hundred KILobytes of data, while a very long task can process several thousand kilobytes of data, and the amount of data processed is 10 times different. At this point, it is more certain that data skew has occurred.

After knowing which stage the data skewer occurs in, we then need to calculate which part of the code corresponding to the stage where the data skewer occurs according to the stage partition principle. There must be a shuffle operator in this part of the code.

Accurate calculation of the corresponding relationship between stage and code requires in-depth understanding of Spark’s source code. Here we can introduce a relatively simple and practical calculation method: If a shuffle operator appears in the Spark code or a statement (such as group by statement) that causes shuffle occurs in the Spark SQL statement, it can be determined that the two stages are divided based on that place. Here, we take Spark’s most basic starter — word counting — as an example of how to approximate the code corresponding to a stage in the simplest way. As shown in the following example, in the whole code, only one reduceByKey is the operator that will generate shuffle. Therefore, it can be considered that the former and latter stages can be divided with this operator as the boundary.

  • Stage0: performs operations from textFile to Map and Shuffle write. Shuffle write is used to partition data in pairs RDD. The same key is written into the same disk file in each task.
  • Stage1 mainly performs the operations from reduceByKey to Collect. Once each task of StagE1 starts running, shuffle Read will be performed first. Tasks that perform shuffle read pull keys from stage0 tasks and perform global aggregation or join operations on the same key. In this case, the value of the key is accumulated. Stage1 calculated the final wordCounts RDD after executing the reduceByKey operator, and then executed the Collect operator to pull all the data to the Driver for us to iterate and print out.
val conf = new SparkConf(a)val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(""))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))
Copy the code

Through the analysis of the word counting program, we hope to understand the basic principle of stage division and how shuffle operation is performed at the boundary of two stages after stage division. We then know how to quickly locate which part of the code corresponds to the stage where the data skew occurred. For example, it is found in Spark Web UI or local log that some tasks of Stage1 execute very slowly, and it is determined that stage1 has data skewness. Then we can go back to the code to locate that Stage1 mainly includes reduceByKey, the shuffle operator. At this point, we can basically determine the data skew problem caused by educeByKey operator. For example, if a word appears 1 million times and other words only appear 10 times, then a task of Stage 1 has to process 1 million data, and the speed of the whole stage will be slowed down by this task.

The memory of a task is out of order

This makes it easier to locate the offending code. You are advised to view the abnormal stack of the local log in yarn-client mode or use Yarn to view the abnormal stack of the log in yarn-cluster mode. In general, the exception stack information can be used to determine which line of your code is running out of memory. Then look around that line of code, and there’s usually a Shuffle operator, which is probably causing the skew. It is important to note, however, that data skew cannot be determined simply by an accidental memory overflow. You can also run out of memory because of bugs in your code and occasional data exceptions. Therefore, you need to use the Spark Web UI to check the running time of each task and the amount of data allocated on the stage where the error occurs to determine whether the memory overflow is caused by data skew.

7. View the data distribution of keys that cause data skew

After knowing where data skew occurs, you usually need to analyze the RDD/Hive table that performs shuffle operation and causes data skew to check the distribution of keys in the table. This is mainly to provide a basis for the choice of a technical solution. When different key distributions are combined with different shuffle operators, different technical solutions may be required. There are several ways to view the key distribution, depending on how you perform the operation:

  • If the group by and JOIN statements in Spark SQL cause data skew, query the key distribution of the tables used in THE SQL.
  • If the data skew is caused by performing the shuffle operator on the Spark RDD, you can add the code to view key distribution in the Spark job, for example, rdd.countbykey (). Collect /take the number of occurrence of each key to the client and print it, you can see the distribution of keys.

For example, for the word counting program mentioned above, if it is determined that the reduceByKey operator of STAGe1 leads to data skew, then we should look at the key distribution in the RDD that performs the reduceByKey operation, which in this case refers to pairs RDD. In the following example, we can first sample 10% of pairs data, then count the occurrence times of each key using countByKey operator, and finally iterate and print the occurrence times of each key in the sample data on the client.

val sampledPairs = pairs.sample(false.0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
Copy the code

8. Data skew solution

Solution 1: Use Hive ETL to preprocess data

Application scenario: Hive tables cause data skew. If the data in the Hive table is uneven (for example, one key has 1 million data and other keys have only 10 data) and Spark is frequently used to analyze Hive tables, use this technology.

Scheme implementation ideas: In this case, you can evaluate whether Hive can be used to pre-process data (that is, Hive ETL can be used to pre-aggregate data by key or join data with other tables). In Spark, the data source is not the original Hive table, but the pre-processed Hive table. In this case, data has been aggregated or joined in advance. Therefore, the original shuffle operator is not required in the Spark job.

Implementation principle: This solution solves the data skew problem from the root, because the Shuffle operator is completely avoided on Spark. Therefore, the data skew problem will not occur. But I should also remind you that this approach is to treat the symptoms rather than the root cause. Data is not evenly distributed. Therefore, data skew occurs when shuffle operations such as group by and join are performed on Hive ETL, resulting in slow Hive ETL speed. The data skew occurs in Hive ETL in advance to avoid data skew in Spark.

Advantages: The solution is simple and convenient to implement, and has good effect. Data skew is completely avoided, and Spark job performance is greatly improved.

Disadvantages: Data skew still occurs in Hive ETL.

Practical experience: In some projects where Java systems and Spark are used together, Java code frequently invokes Spark jobs and has high performance requirements on Spark jobs. Therefore, this solution is suitable. The Hive ETL that forwards data skew to the upstream node only executes the data skew once a day. The execution of the data skew once is slow, and the execution of the Spark job is fast every time Java invokes the Spark job afterwards, providing better user experience.

Practical experience of the project: This scheme is used in the interactive user behavior analysis system of Meituan-Dianping. The system mainly allows users to submit data analysis and statistics tasks through the Java Web system, and the back-end submits Spark jobs through Java for data analysis and statistics. The Spark operation speed must be fast, within 10 minutes; otherwise, the user experience will be poor. Therefore, we advance the shuffle operation of some Spark jobs to Hive ETL so that Spark can directly use pre-processed Hive intermediate tables, minimizing Spark shuffle operations and greatly improving the performance of some jobs by more than 6 times.

Solution 2: Filter out the few keys that cause skew

Scenario: This scheme is suitable if only a few keys are found to cause skew and the impact on the calculation itself is not significant. For example, 99% of keys correspond to 10 pieces of data, but only one key corresponds to 1 million pieces of data, resulting in data skew.

If we judge that a few keys with a large amount of data are not particularly important to the execution and calculation results of the job, we can simply filter out those few keys. For example, you can use the WHERE clause in Spark SQL to filter out the keys or run the filter operator on the RDD in Spark Core to filter out the keys. If you need to dynamically determine which keys have the largest amount of data and then filter them each time a job is executed, you can use the sample operator to sample the RDD and calculate the number of each key. The key with the largest amount of data can be filtered out.

Solution Implementation principle: After the keys that cause data skew are filtered out, these keys will not participate in the calculation. Therefore, data skew cannot occur naturally.

Advantages: simple implementation, and the effect is also very good, can completely avoid data skew.

Disadvantages: There are few applicable scenarios. In most cases, there are many keys that cause skew, not just a few.

Practical experience of the scheme: We also used this scheme to solve data skew in the project. The Spark job suddenly OOM when it is running on a certain day. After tracing, it is found that the data of a key in the Hive table is abnormal on that day, causing the data volume to surge. Therefore, sampling is carried out before each execution. After calculating the keys with the largest amount of data in the sample, those keys are filtered out directly in the program.

Solution 3: Increase the parallelism of shuffle operations

Solution Application scenario: If we must tackle data skew head-on, we recommend this solution as it is the easiest solution to deal with data skew.

Implementation roadmap: When performing the Shuffle operator on the RDD, pass a parameter, such as reduceByKey(1000), to the Shuffle operator, which sets the number of Shuffle read tasks when the Shuffle operator is executed. For Spark shuffle kind of SQL statements, such as group by, join, etc., need to set a parameter, namely the Spark. SQL. Shuffle. Partitions, this parameter represents the shuffle read task parallelism, the value is the default is 200, It’s a little too small for a lot of scenes.

Implementation principle: By increasing the number of Shuffle Read tasks, multiple keys originally assigned to one task can be assigned to multiple tasks so that each task processes less data than the original task. For example, if there are five keys, each of which corresponds to 10 pieces of data, and all five keys are assigned to a task, the task will process 50 pieces of data. With shuffle Read task, each task is assigned a key. That is, each task processes 10 pieces of data. Therefore, the execution time of each task is shortened. The specific principle is shown in the figure below.

Advantages: The scheme is simple to implement and can effectively alleviate the impact of data skew.

Disadvantages: the scheme only alleviates data skew, but does not completely eliminate the problem. According to practical experience, its effect is limited.

Practical experience: The scheme often cannot completely solve the data skew, because if there are some extreme cases, such as a key corresponding to the amount of data has 1 million, so, no matter how much your task number increased to 1 million data corresponding to the key must be or will be assigned to a task to deal with, so the data skew is destined to happen. So this is really just the first thing to try to do when you find skew, to try to mitigate skew simply by mouth, or in combination with other methods.

Solution 4: Two-stage polymerization (local polymerization + global polymerization)

Application scenario: This scheme is applicable when you run the reduceByKey aggregation shuffle operator on the RDD or use the Group by statement in Spark SQL to perform group aggregation.

Scheme implementation idea: The core implementation idea of this scheme is to carry out two-stage aggregation. For the first local aggregation, each key is assigned a random number, such as a random number within 10. In this case, the original same key becomes different, such as (hello, 1) (hello, 1) (Hello, 1) (hello, 1). That would be (1_Hello, 1) (1_Hello, 1) (2_hello, 1) (2_hello, 1). Then type the data after random number, perform reduceByKey and other aggregation operations to perform local aggregation, so the local aggregation result will become (1_hello, 2) (2_hello, 2). (Hello,2)(hello,2) (hello,2)(hello,2) (hello,2)(hello,2) (hello,2)

The implementation principle of the solution is as follows: The same key can be changed into multiple different keys by attaching random prefixes. In this way, the data processed by one task can be dispersed to multiple tasks for local aggregation, thus solving the problem of excessive data processed by a single task. Then remove the random prefix and do the global aggregation again to get the final result. The specific principle is shown in the figure below.

Advantages: This scheme is effective for data skew caused by shuffle operation of aggregation. Data skew can usually be resolved, or at least substantially alleviated, improving Spark job performance by several times.

Disadvantages: This scheme applies only to shuffle operations of aggregation types. If it is a Shuffle operation of the Join class, another solution is needed.

// The first step is to give each key in the RDD a random prefix.
JavaPairRDD<String.Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long.Long>, String.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String.Long> call(Tuple2<Long.Long> tuple)
                    throws Exception {
                Random random = new Random(a); int prefix = random.nextInt(10);
                return new Tuple2<String.Long>(prefix + "_"+ tuple._1, tuple._2); }});// Step 2, perform local aggregation on keys prefixed randomly.
JavaPairRDD<String.Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long.Long.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                returnv1 + v2; }});// Remove the random prefix for each key in the RDD.
JavaPairRDD<Long.Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String.Long>, Long.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long.Long> call(Tuple2<String.Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_") [1]);
                return new Tuple2<Long.Long>(originalKey, tuple._2); }});// Step 4, perform global aggregation on RDD with random prefix removed.
JavaPairRDD<Long.Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long.Long.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                returnv1 + v2; }});Copy the code

Solution 5: Change reduce Join to Map Join

This solution is applicable when join operations are used on RDD or join statements are used in Spark SQL and the data volume of an RDD or table in join operations is small (for example, hundreds of MB or 1 gb).

Solution Implementation Roadmap: Use Broadcast variables and map operators to implement join operations rather than join operators. In this way, shuffle operations are completely avoided and data skew is completely avoided. The data in the small RDD is directly pulled to the memory of the Driver through the COLLECT operator, and then a Broadcast variable is created. Then perform the map operator on the other RDD. In the operator function, get the full data of the smaller RDD from the Broadcast variable and compare it with each data of the current RDD by connection key. If the connection key is the same, then join the data of the two RDD in the way you want.

Implementation principle: A common join is performed in the shuffle process. After shuffle, data with the same key is pulled to a Shuffle Read task for join. In this case, reduce Join is performed. However, if an RDD is small, you can broadcast the full data of small RDD +map operator to achieve the same effect as join, that is, map Join. In this case, shuffle operation will not occur and data skews will not occur. The specific principle is shown in the figure below. Advantages: Data skew caused by join operation is effective because shuffle does not occur and data skew does not occur. Disadvantages: There are few scenarios, because this solution only applies to the case of a large table and a small table. After all, we need to broadcast small tables, which consume memory resources, and the driver and each Executor memory will hold a full copy of the small RDD. If we broadcast a large AMOUNT of RDD data, such as more than 10GB, then we may run out of memory. Therefore, it is not suitable for both large tables.

// Collect RDD data with a relatively small amount of data to the Driver.
List<Tuple2<Long.Row>> rdd1Data = rdd1.collect()
// Then use Spark's broadcast function to convert the small RDD data into broadcast variables, so that each Executor has only one RDD data.
// Can save as much memory as possible, and reduce network transmission performance overhead.
final Broadcast<List<Tuple2<Long.Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// Perform map instead of Join operations on the other RDD.
JavaPairRDD<String.Tuple2<String.Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long.String>, String.Tuple2<String.Row> > () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String.Tuple2<String.Row>> call(Tuple2<Long.String> tuple)
                    throws Exception {
                // In the operator function, the rDD1 data in the local Executor is retrieved by the broadcast variable.
                List<Tuple2<Long.Row>> rdd1Data = rdd1DataBroadcast.value();
                // RDD1 data can be converted to a Map for later join operations.
                Map<Long.Row> rdd1DataMap = new HashMap<Long.Row> ();for(Tuple2<Long.Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // Get the current RDD data key and value.
                String key = tuple._1;
                String value = tuple._2;
                // From the RDD1 data Map, obtain the data that can be joined according to the key.
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String.String>(key, new Tuple2<String.Row>(value, rdd1Value)); }});// Here is a hint.
// The above approach is only applicable to rDD1 where keys are not duplicated and are all unique.
// If rDD1 has multiple identical keys, then the flatMap class will be used. Instead of using the map, the join will have to traverse all rDD1 data.
// RDD2 may return multiple joins for each entry.
Copy the code

Solution 6: Sample the slanted key and split the join operation

Solution Application scenario: When joining two RDD/Hive tables, if the data volume is too large to use Solution 5, check the key distribution in the two RDD/Hive tables. If data skew occurs because a few keys in one RDD/Hive table have too much data, and all keys in the other RDD/Hive table are evenly distributed, then this solution is appropriate.

Scheme implementation ideas:

  • For the RDD that contains a few keys with too much data, a sample is sampled through the sample operator, and then the number of each key is counted to calculate which keys have the largest amount of data.
  • Then, the data corresponding to these keys is separated from the original RDD to form a separate RDD. Each key is prefixed with a random number within N, so that most keys that are tilted will not form another RDD.
  • Then, the other RDD that needs to be joined is filtered out and the data corresponding to those slanted keys is formed into a separate RDD. Each data is expanded into N data, and the n data are sequentially added with a prefix of 0~ N. As a result, most of the slanted keys that do not form another RDD.
  • Then join the independent RDD with random prefix and another independent RDD with expansion of n times. At this time, the original same key can be broken into N parts and dispersed to multiple tasks for join.
  • The other two normal RDD’s just join as usual.
  • Finally, the results of the two join can be combined with the union operator, which is the final join result.

Implementation principle of the scheme: As for the data skew caused by join, if only a few keys cause the skew, a few keys can be divided into independent RDD and broken into N pieces with random prefixes for join. In this case, the data corresponding to these keys will not be concentrated in a few tasks, but dispersed to multiple tasks for join. The specific principle is shown in the figure below.

Advantages: For data skew caused by join, if only a few keys cause skew, this method can be used to break keys in the most effective way for join. In addition, only the data corresponding to a small number of slanted keys needs to be expanded by n times, rather than the full data needs to be expanded. Avoid taking up too much memory.

Disadvantages: If a large number of keys cause skew, for example, thousands of keys cause data skew, this method is not suitable.

// Start by sampling 10% of the sample data from RDD1 that contains a few keys that cause data skew.
JavaPairRDD<Long.String> sampledRDD = rdd1.sample(false.0.1);

// RDD counts the occurrence times of each key, and sorts them in descending order by occurrence times.
// Select top 1 or top 100 data, i.e., the top n data with the most keys.
// Select the key with the largest number of data.
JavaPairRDD<Long.Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long.String>, Long.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long.Long> call(Tuple2<Long.String> tuple)
                    throws Exception {
                return new Tuple2<Long.Long>(tuple._1, 1L); }});JavaPairRDD<Long.Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long.Long.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                returnv1 + v2; }});JavaPairRDD<Long.Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long.Long>, Long.Long> () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long.Long> call(Tuple2<Long.Long> tuple)
                    throws Exception {
                return new Tuple2<Long.Long>(tuple._2, tuple._1); }});final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0) _2.// Separate the key that causes data skew from RDD1 to form a separate RDD.
JavaPairRDD<Long.String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long.String>, Boolean> () {private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long.String> tuple) throws Exception {
                returntuple._1.equals(skewedUserid); }});// Separate the normal keys from RDD1 that do not cause data skew to form a separate RDD.
JavaPairRDD<Long.String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long.String>, Boolean> () {private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long.String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// RDD2, which is the RDD where all keys are relatively evenly distributed.
// Filter out the data corresponding to the previously obtained key in RDD2, divide it into separate RDD, and expand all the data in RDD by 100 times using flatMap operator.
// Prefix each data item with a number from 0 to 100.
JavaPairRDD<String.Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long.Row>, Boolean> () {private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long.Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long.Row>, String.Row> () {private static final long serialVersionUID = 1L;
            @Override
            public 可迭代<Tuple2<String.Row>> call(
                    Tuple2<Long.Row> tuple) throws Exception {
                Random random = new Random(a);List<Tuple2<String.Row>> list = new ArrayList<Tuple2<String.Row> > ();for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String.Row>(i + "_" + tuple._1, tuple._2));
                }
                returnlist; }});// Separate RDD from RDD1 that causes the skew key, with each entry prefixed with a random number up to 100.
// Join the RDD from rDD1 with the RDD from RDD2.
JavaPairRDD<Long.Tuple2<String.Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long.String>, String.String> () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String.String> call(Tuple2<Long.String> tuple)
                    throws Exception {
                Random random = new Random(a); int prefix = random.nextInt(100);
                return new Tuple2<String.String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String.Tuple2<String.Row> >,Long.Tuple2<String.Row> > () {private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long.Tuple2<String.Row>> call(
                            Tuple2<String.Tuple2<String.Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_") [1]);
                            return new Tuple2<Long.Tuple2<String.Row>>(key, tuple._2); }});// Separate RDD from RDD1 that contains normal keys and join rDD2 directly.
JavaPairRDD<Long.Tuple2<String.Row>> joinedRDD2 = commonRDD.join(rdd2);

// Join the result of slanted key with the result of normal key, uinon.
// Is the final join result.
JavaPairRDD<Long.Tuple2<String.Row>> joinedRDD = joinedRDD1.union(joinedRDD2);
Copy the code

Solution 7: Use random prefixes and RDD expansion to join

Application scenario: If a large number of keys in the RDD causes data skew during join operation, it is meaningless to split keys. In this case, the last solution is used to solve the problem.

Scheme implementation ideas:

  • The implementation roadmap of this solution is similar to solution 6. First, check the data distribution in the RDD/Hive table and find the RDD/Hive table that causes data skew. For example, multiple keys correspond to more than 10,000 pieces of data.
  • Each piece of data in that RDD is then prefixed with a random prefix within n.
  • At the same time, expand the capacity of another normal RDD, and expand each data item to N data items. Each expanded data item is prefixed with a prefix from 0 to N.
  • Finally, join the two processed RDD’s.

The implementation principle of the solution is that the same key can be changed into different keys by attaching random prefixes. Then, these “different keys” can be processed by multiple tasks, rather than one task processing a large number of the same key. The difference between this solution and Solution 6 is that the previous solution only performs special processing for a small number of data corresponding to slanted keys. Therefore, the memory usage after RDD expansion is not large. However, in this scheme, there are a large number of slanted keys, and some keys cannot be separated for separate processing. Therefore, data expansion can only be performed for the entire RDD, requiring high memory resources.

Advantages of the scheme: Data skew of join type can be basically processed, and the effect is relatively significant, and the performance improvement effect is very good.

Disadvantages: This scheme is more about alleviating data skew than avoiding data skew completely. In addition, the entire RDD needs to be expanded, which requires high memory resources.

Solution practical experience: Once when developing a data requirement, it was found that a join caused data skew. Before optimization, the execution time of the job was about 60 minutes; After optimization with this scheme, the execution time is reduced to about 10 minutes and the performance is improved by 6 times.

// Expand one of the RDD's relatively evenly distributed keys by 100 times.
JavaPairRDD<String.Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long.Row>, String.Row> () {private static final long serialVersionUID = 1L;
            @Override
            public 可迭代<Tuple2<String.Row>> call(Tuple2<Long.Row> tuple)
                    throws Exception {
                List<Tuple2<String.Row>> list = new ArrayList<Tuple2<String.Row> > ();for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String.Row> (0 + "_" + tuple._1, tuple._2));
                }
                returnlist; }});// Next, take another RDD with a data skew key and prefix each data with a random prefix up to 100.
JavaPairRDD<String.String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long.String>, String.String> () {private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String.String> call(Tuple2<Long.String> tuple)
                    throws Exception {
                Random random = new Random(a); int prefix = random.nextInt(100);
                return new Tuple2<String.String>(prefix + "_"+ tuple._1, tuple._2); }});// Join the two processed RDDS.
JavaPairRDD<String.Tuple2<String.Row>> joinedRDD = mappedRDD.join(expandedRDD);
Copy the code

Solution 8: Use multiple solutions together

In practice, it is found that in many cases, if you only deal with relatively simple data skew scenarios, you can basically solve the problem by using one of the above schemes. However, if you are dealing with a more complex data skew scenario, you may need to combine multiple scenarios. For example, if Spark has multiple data skew links, solutions 1 and 2 can be used to preprocess part of the data and filter part of the data to alleviate the problem. Secondly, the parallelism of some Shuffle operations can be improved to optimize their performance. Finally, you can choose a solution to optimize performance for different aggregation or join operations. You need to have a thorough understanding of the ideas and principles of these solutions, in practice according to various situations, flexible use of a variety of solutions to solve their own data skew problem.

Reference Documents:

  1. The strongest and most comprehensive specification guide for warehouse construction
  2. Meituan data platform and data warehouse construction practice, over 100,000 words summary
  3. Hundreds of quality big data books, with must-read lists (Big Data Treasure)
  4. Fifty thousand words | spent a month unscrambles the Hadoop vomiting blood