Spark tuning RDD operator tuning

Cut the crap and get right to the point!

1. RDD reuse

When performing operator on RDD, avoid repeated calculation of RDD under the same operator and calculation logic, as shown in the following figure:

Modify the RDD computing architecture in the figure above and obtain the optimization results as shown in the figure below:

2. The filter as soon as possible

After obtaining the initial RDD, filter out unnecessary data as soon as possible to reduce memory usage and improve the efficiency of Spark jobs.

This article was first published on the public account: Learning big data in five minutes, welcome to the crowd! Reply [books] to get hundreds of books on big data

3. Read lots of small files – with wholeTextFiles

When we read a text file as an RDD, each line of input becomes an element of the RDD.

It is also possible to read multiple complete text files at once into a pairRDD, where the key is the file name and the value is the file content.

val input:RDD[String] = sc.textFile("dir/*.log") 
Copy the code

If you pass a directory, all files in the directory are read as RDD. Wildcard characters are supported for file paths.

However, it is not efficient to read a large number of small files. WholeTextFiles should be used. The return Value is RDD[(String, String)], where Key is the name of the file and Value is the content of the file.

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
Copy the code

WholeTextFiles reads small files:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(""))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
Copy the code

4. MapPartition and foreachPartition

  • mapPartitions

map(_….) Represents each element

mapPartitions(_….) An iterator representing the composition of data for each partition

The normal MAP operator operates on each element in the RDD, while the mapPartitions operator operates on each partition in the RDD.

If it is a normal map operator, assuming that a partition has 10,000 pieces of data, the function of the map operator should be performed 10,000 times, that is, to operate on each element.

If it is a mapPartition operator, a task can perform function only once because it processes RDD partitions. Function receives all partition data at a time, which is efficient.

For example, if you want to write all the data in the RDD to the data via JDBC, using the Map operator creates a database connection for each element in the RDD, which consumes a lot of resources. Using the mapPartitions operator only creates one database connection for each partition.

MapPartitions operators also have some disadvantages. For a normal MAP operation, one data is processed at a time. If there is insufficient memory after processing 2000 data, the 2000 data that has been processed can be garbage collected from memory. However, if the mapPartitions operator is used but the data volume is very large, function will process the data of one partition at a time. If the memory is insufficient and cannot be reclaimed at this time, OOM may occur, that is, memory overflow.

Therefore, mapPartitions operators are suitable for situations where the data volume is not particularly large, and the performance improvement of mapPartitions operators is good. (When the data volume is very large, the mapPartitions operator will be used to generate OOM data)

In your project, you should first estimate the amount of data in the RDD, the amount of data per partition, and the memory resources allocated to each Executor, and if resources allow, consider using mapPartitions operators instead of Maps.

  • foreachPartition

rrd.foreache(_….) Represents each element

rrd.forPartitions(_….) An iterator representing the composition of data for each partition

In the production environment, the foreachPartition operator is used to write data to the database. The foreachPartition operator can optimize the database write performance.

If the foreach operator is used to complete the database operation, because the foreach operator traverses every data in the RDD, each data will establish a database connection, which is a huge waste of resources. Therefore, we should use the foreachPartition operator for database writing operations.

Similar to mapPartitions operator, foreachPartition takes each RDD partition as an object to traverse, processing data of one partition at a time. In other words, only one database connection needs to be created for data of one partition if database operations are involved, as shown below:

After foreachPartition is used, the following performance improvements can be achieved:

  1. For our function function, we process the entire partition at a time;
  2. Create a unique database connection for data within a partition;
  3. You only need to send an SQL statement and multiple sets of parameters to the database once;

In a production environment, all database operations are performed using the foreachPartition operator. There is a problem with foreachPartition operators, similar to mapPartitions operators. If a partition has a very large amount of data, it may cause OOM (memory overflow).

5. Filter +coalesce/ Repartition

In Spark tasks, the filter operator is often used to filter data in RDD. In the initial stage of the task, the amount of data loaded from each partition is similar, but once the filter is applied, the amount of data in each partition may vary greatly, as shown in the following figure:

According to the figure above, we can find two problems:

  1. The amount of data on each partition becomes smaller. If the current data is processed according to the number of tasks equal to that of the partition, the computing resources of the task are wasted.

  2. The amount of data on each partition is different. As a result, each task needs to process different amounts of data on each partition. This may cause data skew.

As shown in the above, the article 100 after the second partition of data filtering, and the third partition of data filtering after the remaining 800, under the same processing logic, the second partition corresponding to the task of dealing with the amount of data and the third partition task processing reached eight times the amount of data that gap, it will also lead to speed several times there may be a gap, This is the problem of data skew.

In view of the above two problems, we analyze them respectively:

  1. For the first question, since the amount of data in the partition is smaller, we hope to redistribute the data in the partition, for example, to transform the data in the original four partitions into two partitions, so that only the latter two tasks can be used for processing, avoiding the waste of resources.

  2. The solution to the second problem is very similar to the solution to the first problem, redistributing the partition data so that the amount of data in each partition is similar, which avoids the data skew problem.

So how to implement the above solution? We need the coalesce operator.

Both repartition and coalesce can be used for repartition. Repartition is a simple implementation of shuffle (true) in the coalesce interface. Shuffle is not supported in coalesce by default, but can be set based on parameters.

Suppose we want to repartition the original number of partitions A into B, then there are the following situations:

  1. A > B (Merge most partitions into minority partitions)

    • There is not much difference between A and B

      In this case, you can use coalesce instead of shuffle.

    • There is A great difference between A and B

      In this case, you can use coalesce without enabling the shuffle process. However, the performance of the coalesce process deteriorates. Therefore, you are advised to set the second parameter of coalesce to true, that is, enable the shuffle process.

  2. A < B (minority partition split into majority partition)

In this case, use repartition. If coalesce is used, set shuffle to true. Otherwise, Coalesce is invalid.

After the filter operation, the coalesce operator can be used to compress the number of partitions and make the data volume of each partition as uniform and compact as possible for subsequent tasks to perform calculation operations. To some extent, it can improve performance to some extent.

Note: Local mode is an in-process simulation of cluster operation, which has been internally optimized for parallelism and partition number, so there is no need to set parallelism and partition number.

6. Parallelism setting

The parallelism in Spark refers to the number of tasks at each stage.

If the parallelism is too low, resources will be wasted. For example, if 20 executors are allocated three CPU cores for each Executor, and the Spark job contains 40 tasks, two tasks can be allocated for each Executor. This leaves one CPU core free for each Executor, resulting in wasted resources.

The ideal parallelism should be set to match the parallelism with the resources. In short, the parallelism should be set as large as possible to make full use of cluster resources. Proper parallelism can improve the performance and running speed of Spark jobs.

Spark recommends that the number of tasks be set to two to three times the total number of CPU cores of Spark jobs. The reason why the number of tasks is not recommended to be equal to the total number of CPU cores is that the execution time of some tasks is different. Some tasks are fast and some tasks are slow. If the number of tasks is equal to the total number of CPU cores, the tasks that are fast are executed. The CPU core is idle. If the number of tasks is set to two or three times of the total number of CPU cores, the CPU core immediately executes the next task after one task is completed, reducing resource waste and improving the efficiency of Spark job running.

The Spark job parallelism is set as follows:

val conf = new SparkConf().set("spark.default.parallelism"."500")
Copy the code

Principle: Make full use of CPU Core (number of CPU cores). If there are 100 cores, parallelism can be set to 200-300.

7. Repartition /coalesce adjusts the parallelism

We know that Spark has a parallelism adjustment policy. However, the setting of parallelism does not take effect for Spark SQL. The user-set parallelism takes effect only for all Spark stages except Spark SQL.

The parallelism of Spark SQL cannot be specified by users. Spark SQL automatically sets the parallelism of the Spark SQL stage based on the number of SPLITS of HDFS files corresponding to hive tables. The users themselves through spark. Default. Parallelism parameter specifies the parallelism, will only take effect in no spark SQL stage.

The parallelism of the Spark SQL stage cannot be manually set. If the amount of data is large and the subsequent transformation operations in this stage have complex service logic, Spark SQL automatically sets only a few tasks. This means that each task has to process a large amount of data and then perform very complex processing logic, which can result in the first stages with Spark SQL being slow and subsequent stages without Spark SQL running very fast.

To solve the problem that Spark SQL cannot set the parallelism and number of tasks, you can use the repartition operator.

The comparison figure before and after repartition operator is as follows:

You cannot change the parallelism and the number of tasks in the Spark SQL step. However, you can use the repartition operator to repartition the RDD queried by the Spark SQL step into multiple partitions. Since the RDD operations after repartition no longer involve Spark SQL, the parallelism of the stages will be equal to the value you set manually, preventing the stages on which Spark SQL exists from having to process a large amount of data with a small number of tasks and perform complex algorithmic logic. The before and after pairs of the repartition operator are shown in the figure above.

8. ReduceByKey Local preaggregation

A significant feature of reduceByKey compared with ordinary shuffle operation is that it can perform local aggregation on the Map side. The Map side will combine the local data first and then write the data into the file created by each task on the next stage, that is, on the Map side. For the value corresponding to each key, perform the reduceByKey operator function.

The execution process of reduceByKey operator is shown in the figure below:

The performance improvement by using reduceByKey is as follows:

  1. After local aggregation, the amount of data on the MAP server decreases, reducing disk I/OS and occupying disk space.
  2. After local aggregation, the amount of data pulled from the next stage decreases, reducing the amount of data transmitted over the network.
  3. After local aggregation, data caching on the Reduce end consumes less memory.
  4. After local aggregation, the amount of data aggregated on the Reduce end decreases.

Based on the local aggregation characteristics of reduceByKey, we should consider using reduceByKey instead of other shuffle operators, such as groupByKey.

The operating principle of groupByKey and reduceByKey is shown in Figure 1 and Figure 2 below:

As shown in the preceding figure, groupByKey shuffles all map data to the Reduce end and performs data aggregation on the Reduce end instead of aggregation on the Map end. As reduceByKey has the feature of map-side aggregation, the amount of data transmitted on the network is reduced, so the efficiency is significantly higher than that of groupByKey.

9. Use persistence +checkpoint

Spark persistence works well in most cases, but sometimes data may be lost. If data is lost, the lost data needs to be re-calculated, cached and used after the calculation. To avoid data loss, you can checkpoint the RDD. This is to persist data to a fault-tolerant file system (such as HDFS).

After an RDD cache is checked, if the cache is lost, the SYSTEM preferentially checks whether the checkpoint data exists. If yes, the system uses the checkpoint data instead of recalculating. In other words, checkpoint data is used as a cache safeguard mechanism. If the cache fails, checkpoint data is used.

The advantage of checkpoint is that Spark job reliability is improved. If the cache fails, data does not need to be recalculated. The disadvantage is that data needs to be written to a file system, such as the HDFS, which consumes high performance.

The persistence Settings are as follows:

Sc. SetCheckpointDir (' HDFS) RDD. Cache/persist (memory_and_disk) RDD. CheckpointCopy the code

10. Use broadcast variables

By default, if an external variable is used in an operator in a task, each task gets a copy of the variable, which is a huge memory drain. On the one hand, if the RDD is persisted later, the RDD data may not be stored in memory, but can only be written to disk, which will severely consume performance. On the other hand, when tasks create objects, they may find that the heap memory cannot store newly created objects, which leads to frequent GC. As a result, the worker thread stops and Spark stops working for a period of time, which seriously affects Spark performance.

If the current task is configured with 20 Executors and 500 tasks are specified, a 20 MB variable is shared by all tasks. In this case, 500 copies of 500 tasks are generated, which consumes 10 GB of cluster memory. If the broadcast variable is used, each Executor saves one copy. A total of 400M memory was consumed, a five-fold reduction in memory consumption.

The broadcast variable holds a copy for each Executor and is shared by all of that Executor’s tasks, reducing the number of copies produced by the variable.

In the initial phase, the broadcast variable has only one copy in the Driver. When a task is running and wants to use the data in a broadcast variable, it first attempts to obtain the variable from its local Executor’s corresponding BlockManager. The BlockManager remotely pulls a copy of the variable from the BlockManager of a Driver or other node and manages it by the local BlockManager. All tasks of this Executor then fetch variables directly from the local BlockManager.

Data that might be shared by multiple tasks can be broadcast to each Executor:

Val Broadcast variable name = sc.broadcast(variables to be used by tasks, that is, variables to be broadcast) Broadcast variable name. Value// Get the broadcast variable
Copy the code

11. Serialize using Kryo

By default, Spark uses Java’s serialization mechanism. The serialization mechanism of Java is easy to use and does not require additional configuration. The variables used in the operator can implement the Serializable interface. However, the serialization mechanism of Java is not efficient, the serialization speed is slow and the space occupied by serialized data is still large.

Spark officially claims that Kryo serialization is about 10 times more efficient than Java serialization. Spark does not use Kryo as the default serialization class library because it does not support the serialization of all objects and requires users to register the types to be serialized before using Kryo. But starting with Spark 2.0.0, Shuffling RDDs of simple types, arrays of simple types, and strings have used Kryo serialization by default.

The code for Kryo serialization registration is as follows:

public class MyKryoRegistrator implements KryoRegistrator{
  @Override
  public void registerClasses(Kryo kryo){ kryo.register(StartupReportLogs.class); }}Copy the code

The code to configure Kryo serialization is as follows:

// Create a SparkConf object
val conf = newSparkConf (). SetMaster (...). SetAppName (...).// Use the Kryo serialization library
conf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer");  
// Register a custom collection of classes in the Kryo serialization library
conf.set("spark.kryo.registrator"."bigdata.com.MyKryoRegistrator"); 
Copy the code

This article was published on the official account: Learn big data in five minutes, reply [666] and you can get the full set of big data pen interview course