This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).

  • Spark Business Application: In-depth analysis of Spark data skew case tests and tuning guidelines
  • Spark Business Application Deployment – In-depth analysis of Spark resource scheduling parameter tuning
  • Spark Business Application: In-depth analysis of Spark application development parameter tuning
  • Spark Business application Combat – In-depth analysis of Spark Shuffle process parameter optimization

1 Spark Internal resource relationship

2 Optimize the configuration of Spark running resources

./bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ - driver - the memory 1 g \ conf spark. The default. The parallelism = 1000 \ - conf spark. Storage. MemoryFraction = 0.5 \ - the conf The spark. Shuffle. MemoryFraction = 0.3 \Copy the code

3 Spark operator tuning suggestions

  • Program development tuning: Avoid creating duplicate RDD

      val rdd1 = sc.textFile("hdfs://master01:9000/hello.txt")
      rdd1.map(...)
      val rdd2 = sc.textFile("hdfs://master01:9000/hello.txt")
      rdd2.reduce(...)
    Copy the code

You need to perform a map operation and a Reduce operation on the HDFS file named Hello. TXT. That is, you need to perform two operator operations on a piece of data. Out: Creating multiple RDDS when performing multiple operator operations on the same data. The textFile method is executed twice, creating two RDDS for the same HDFS file, and then performing an operator operation on each RDD. In this case, Spark needs to load the hello. TXT file twice from the HDFS and create two separate RDD files. The performance cost of loading the HDFS file a second time and creating the RDD is obviously wasted.


  • Program development tuning: Reuse the same RDD whenever possible

  • Out: Having a <long, String> RDD, that is, RDD1. Then, due to business needs, a map operation is performed on RDD1 to create an RDD2, and the data in RDD2 is only the values in RDD1, that is, RDD2 is a subset of RDD1.

    JavaPairRDD<long , String> rdd1 = … JavaRDD rdd2 = rdd1.map(…)

Different operator operations are performed on RDD1 and RDD2 respectively.

    rdd1.reduceByKey(...)
    rdd2.map(...)
Copy the code
  • The right thing to do:

Rdd2 data is a complete subset of RDD1, but two RDDDS are created and an operator is performed on both. In this case, the map operator is performed on RDD1 to create RDD2, and the operator operation is performed one more time, which increases the performance overhead. In this case, you can reuse the same RDD. We can use RDD1 to do both reduceByKey and Map operations.

JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
Copy the code

  • Program development tuning: Persist multiple RDD uses

    // The right thing to do. // The cache() method is used to persist all RDD data to memory in a non-serialized manner. // RDD1 will be computed from the source only when the map operator is first executed. // When the reduce operator is executed for the second time, data is directly extracted from the memory for calculation, and the RDD is not computed twice. Val rdd1 = sc. TextFile (" HDFS: / / 192.168.0.1:9000 / hello. TXT "). The cache () rdd1. The map (...). rdd1.reduce(...) In: // Serialization reduces memory/disk usage of persistent data, thus avoiding memory overload by persistent data and frequent GC. Val rdd1 = sc. TextFile (" HDFS: / / 192.168.0.1:9000 / hello. TXT "). The persist (StorageLevel. MEMORY_AND_DISK_SER) rdd1. The map (...). rdd1.reduce(...)Copy the code

DISK_ONLY and _2 are not recommended because data is read and written based on disk files, which reduces performance and incurs high network overhead


  • Program development tuning: Avoid using shuffle operators

If possible, avoid shuffle-like operators. The biggest performance drain is the Shuffle process.

During shuffle, the same key on each node is first written into the local disk file. Then, other nodes need to transfer the same key from the disk file on each node over the network. In addition, when the same key is pulled from the same node for aggregation, too many keys may be processed on one node, resulting in insufficient memory, and overflow to disk files. Therefore, during shuffle, a large number of DISK file I/O operations and data transfer operations may occur. Disk I/O and network data transmission are also the main reasons for the poor performance of shuffle.

Avoid using reduceByKey, Join, Distinct, and Repartition as possible, and use non-Shuffle operators of the Map class as possible. // The traditional join operation results in shuffle operation. // In the two RDD's, the same key needs to be pulled to the same node through the network and joined by a task. Val rdd3 = rdd1.join(rDD2) // Broadcast+map Join operation does not cause shuffle operation. // Use Broadcast to use a small RDD as a Broadcast variable. // Note that the above operation is recommended only when rDD2 data is small (e.g., several hundred megabytes, or one or two gigabytes). // This is because each Executor memory holds a full copy of RDD2 data. val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) val rdd3 = rdd1.map(rdd2DataBroadcast...)Copy the code

  • Program development tuning: Shuffle operation with map-side preaggregation

If the shuffle operation is required and map-like operators cannot be used instead, use map-side pre-aggregation operators, similar to the local Combiner in MapReduce. After map-side pre-aggregation, there is only one local key for each node because multiple identical keys are aggregated. When other nodes pull the same key from all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing disk I/O and network transmission overhead.

It is recommended that you use reduceByKey or aggregateByKey to replace the groupByKey operator


  • Program development tuning: Use high-performance operators
  • Using reduceByKey/aggregateByKey alternative groupByKey: map – side
  • Using mapPartitions instead of normal maps: function execution frequency
  • Replace foreach with foreachPartitions: function execution frequency
  • Perform coalesce operations after filter is used: Compress partitions after filter is used
  • Using repartitionAndSortWithinPartitions alternative repartition and sort operations

RepartitionAndSortWithinPartitions is recommended a Spark website operator, the official advice, if you need to repartition after heavy partitions, sorting, Suggest to use repartitionAndSortWithinPartitions operator directly


  • Program development tuning: broadcast large variables

If you need to use external variables in operator functions (especially large variables, such as large sets of 100 MB or larger) during development, you should use the Broadcast function of Spark to improve performance. By default, Spark makes multiple copies of the variable and sends them to tasks over the network. Each task has one copy of the variable. If the variable itself is large (say, 100 megabits, or even 1 gigabyte), the performance overhead of moving a large number of copies of the variable across the network, as well as the frequent GC that takes up too much memory among the executors of each node, can greatly affect performance. After a variable is broadcast, only one copy of the variable resides in the memory of each Executor, and the copy of the variable is shared by tasks in the Executor.


  • Program development tuning: Optimize serialization performance with Kryo
  • 1. When an external variable is used in an operator function, it will be serialized for network transmission.
  • 2. When a custom type is used as a generic type of RDD (e.g. JavaRDD, Student), all objects of the custom type are serialized. Therefore, in this case, custom classes must also implement the Serializable interface.
  • 3. With a serializable persistence strategy (such as MEMORY_ONLY_SER), Spark serializes each partition in the RDD into a large byte array.

Spark uses the Java serialization mechanism by default. You can use Kryo as the serialization library to be more efficient than the Java serialization mechanism:

// Create a SparkConf object. val conf = new SparkConf().setMaster(...) .setAppName(...) // Set the serializer to KryoSerializer. The conf. Set (" spark. The serializer, "" org. Apache. Spark. Serializer. KryoSerializer") / / registered to the types of custom serialization. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))Copy the code

  • Program development tuning: Optimize data structures

In Java, there are three types that consume memory:

  • 1. Objects. Each Java object has additional information such as object headers, references, etc., so it takes up more memory.

  • 2. Strings, each containing an array of characters and additional information such as length.

  • 3. Collection types, such as HashMap, LinkedList, etc., because collection types often use inner classes to encapsulate collection elements, such as map.Entry

    Spark official recommendations, the Spark encoding implementation, especially for operator functions in the code, try not to use the above three kinds of data structure, try to use the string to replace object, using primitive types (such as Int, Long) instead of string, use an array to replace collection types, such as much as possible to reduce memory usage, so as to reduce the frequency of GC, Improve performance.

4 summarizes

Because the development program tuning is relatively mature, so in this reference to Daniel’s notes, plus their own summary, done in one go.

Qin Kaixin in Shenzhen