background

Spark Structrued Streaming is used in the project, which is the new version of Spark 2.0. According to the official document, the performance and real-time performance are better than the previous Dstreaming, but the relevant information is much less than Dstreaming. Much of the current tuning phase will refer to Dstreaming’s articles and experience.

Here is a note on the tuning of Structured Streaming and DStreaming in different directions.

Our overall goals are:

  1. The Processing Time of each Batch interval is reduced
  2. Set batch Size (Data size of each Batch Interval)

To this end, here are the optimization measures for different directions:

1. Parallelism of data reception

1.1 Creating Multiple Streams (Split Topic)

When Spark receives data (such as Kafka or Flume) through MQ, the data is deserialized and stored in Spark’s memory. If data reception is called a bottleneck of the system, parallelization of data reception can be considered. Each input Stream launches a Receiver on a Worker’s Executor that receives a data Stream. Therefore, you can achieve the effect of receiving multiple data streams by creating multiple input streams and configuring them to receive partitioned data from different data sources.

For example, an input Stream that receives two Kafka topics can be split into two input streams, each receiving data for one Topic. This creates two receivers to receive data in parallel, increasing throughput.

Multiple streams can be aggregated using the union operator to form a single Stream. The subsequent transformation operators will then apply to the same aggregated Stream.

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(...) ); } JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
Copy the code

1.2 Adjusting the Number of Block Intervals/Partitions

The default block interval is 200ms. Most receivers split the received data into blocks before saving them to Spark’s BlockManager. The number of blocks in each Batch determines the number of partitions in the RDD corresponding to the batch and the number of tasks created during transformation to the RDD. The number of tasks corresponding to each batch is estimated, that is, batch interval/block interval.

For example, if the batch interval is 2s and the block interval is 200ms, 10 tasks will be created. If you consider the batch number of tasks to be too small, i.e. less than the number of CPU cores per machine, then the batch number of tasks is not sufficient because all CPU resources cannot be fully utilized. To increase the number of blocks for Batch, decrease the block interval. However, the recommended minimum block interval is 50ms, below which the startup time of a large number of tasks can become a performance overhead.

  • Dstreaming can pass parameters spark. Streaming. BlockInterval Settings

  • Structured Streaming requires explicit repartitioning of the input data stream. Using inputStreamDataset. Repartition (< number of partitions >). In this way, the received batch can be distributed to a specified number of machines before further operations are performed.

Perform all tasks on a stage.

  • Scheduler Delay: Indicates the time spent by Spark to allocate tasks
  • Executor Computing Time: The amount of Time an Executor spends executing a task
  • Getting Result Time: the Time it takes to get the Result of a task execution
  • Result Serialization Time: Serialization Time of task execution results
  • Task Deserialization Time: Task Deserialization Time
  • Shuffle Write Time: indicates the Shuffle data Write Time
  • Shuffle Read Time: Shuffle Indicates the Time taken to Read data

The change of partition quantity will affect the change of the above indicators. When we tune, a lot of times we look at the changes in the metrics above. When the partition changes, the changes of the above indicators are as follows:

  • A partition is too small [Prone to data skew]
    • Scheduler Delay: There is no significant change
    • Executor Computing Time: Variable, large and small, but large on average
    • 3. Getting Result Time: unstable, large or small, but large on average
    • Result Serialization Time: it is unstable. There are some big ones and some small ones, but on average, it is relatively large
    • Task Deserialization Time: It is not stable. Some are large or small, but on average, it is relatively large
    • Shuffle Write Time: It is unstable. Some characters are large or small, but the average value is large
    • Shuffle Read Time: Unstable, large or small, but large on average
  • Partition is too large
    • Scheduler Delay: There is no significant change
    • Executor Computing Time: Relatively stable, small on average
    • Getting Result Time: relatively stable, small on average
    • Result Serialization Time: relatively stable, relatively small on average
    • Task Deserialization Time: Relatively stable, relatively small on average
    • Shuffle Write Time: stable and small on average
    • Shuffle Read Time: relatively stable and small on average

Note:

  1. Num_partition =(batch_interval/block_interval) * num_receiver is recommended

  2. When data is landed, ensure that the number of partitions is not too large to prevent the loading of too many small files. Before landing, call RDd. coalesce(num_partition) to reduce the number of partitions

2. Tune task startup

If too many tasks are started per second, such as 50 tasks per second, the performance cost of sending these tasks to the Executor on the Worker node will be high and the millisecond delay will be difficult to achieve. You can reduce the performance overhead in this area by doing the following:

  1. Task serialization: Using the Kryo serialization mechanism to serialize tasks reduces the size of the tasks, thereby reducing the time it takes to send those tasks to executors on individual Worker nodes.
  2. Execution mode: Run Spark in Standalone mode to achieve less task startup time.

The above approach may reduce the processing time of each batch by 100 ms. So you go from seconds to milliseconds.

3. Data processing parallelism tuning

If the number of parallel tasks used in any stage of computation is not high enough, the cluster resources will not be fully utilized. For distributed, for example, reduce operation, such as reduceByKey and reduceByKeyAndWindow, the default is the number of parallel task by spark. The default. The parallelism parameters. You can be in operation, such as reduceByKey was introduced into the second parameter, manually specify the parallelism of the operation, also can adjust the spark global. Default. The parallelism parameters.

If parallel Task is insufficient, core is underutilized. To accelerate the spark by increasing parallelism by default. The default. The parallelism, task number also shoulds not be too much, too much, the task of serialization and deserialization time-consuming is also higher, the opposite effect. The suggestion is #executors * #core_per_executor * 4

4. Data serialization tuning

4.1

The overhead caused by data serialization can be reduced by optimizing the serialization format. In the case of streaming computing, there are two types of data that need to be serialized.

Input data: By default, received input data is stored in Executor memory at a persistence level of Storagelevel.memory_and_disk_ser_2. This means that the data is serialized into bytes to reduce GC overhead and is copied for executor failure tolerance. As a result, data is stored in memory first, and then overwritten to disk when memory runs low to hold all the data needed for streaming computing. Serialization here has significant performance overhead — the Receiver must deserialize the data received from the network and then serialize the data using Spark’s serialization format.

2. Persistent RDD generated by streaming operations: Persistent RDD generated by streaming operations may be persisted to memory. For example, windowing operations persist data in memory by default, because the data may later be used in multiple Windows and processed multiple times. However, unlike Spark Core’s default persistence level, storagelevel.memory_only, the default persistence level for RDD generated by streaming computation operations is storagelevel.memory_only_ser, which reduces GC overhead by default.

4.2 Data serialization Tuning

In the above scenario, using the Kryo serialization class library reduces the CPU and memory performance overhead. Using Kryo, must consider to register the custom classes, and disable the corresponding reference tracking (spark. Kryo. ReferenceTracking).

In some special scenarios, such as when the amount of data that needs to be kept for streaming applications is not very large, it may be possible to persist the data in a non-serialized manner to reduce the CPU overhead of serialization and deserialization without the expensive GC overhead. For example, if you run batch intervals for several seconds and do not use Windows, you can consider explicitly setting the persistence level to prevent serialization of data during persistence. This reduces the CPU performance overhead for serialization and deserialization without incurring too much GC overhead.

5. Batch Interval tuning

If a Spark Streaming application running on a cluster is to be stable, it must process the received data as quickly as possible. In other words, batch should be processed as quickly as possible after it is created. To determine whether this is a problem for an application, observe the Batch processing time on the Spark UI. The batch processing time must be shorter than the Batch Interval.

Due to the nature of streaming computing, batch interval has a huge impact on the data reception rate that can be maintained by an application with fixed cluster resources. For example, in the WordCount example, for a particular data reception rate, the application business can guarantee that the WordCount will be printed every 2 seconds instead of every 500ms. Therefore, the Batch interval needs to be set so that the expected data reception rate can be maintained in the production environment.

A good way to calculate the correct batch size for your application is to test at a very conservative Batch interval, such as 5 to 10 seconds, at a very slow data reception rate. To check whether the application keeps up with the data rate, you can check the latency of each batch’s processing time. If the processing time matches the Batch interval, the application is stable. Otherwise, if the batch scheduling delay continues to grow, it means that the application cannot keep up with the rate, which is unstable. So if you want a stable configuration, try increasing the speed of data processing or increasing the Batch interval. Remember, a temporary delay in growth due to a temporary increase in data can be justified, as long as the delay can be recovered in a short time.

6. Memory tuning

6.1 Evaluating memory Usage

The cluster memory resources required by the Spark Streaming application are determined by the operation type of Transformation. For example, if you want to use a window operation with a window length of 10 minutes, the cluster must have enough memory to hold data for 10 minutes. If you want to use updateStateByKey to maintain the state of many keys, your memory resources must be large enough. Conversely, if you want to do a simple map-filter-store operation, you need very little memory.

Generally, data received through a Receiver is stored at the storagelevel.memory_and_disk_ser_2 persistence level, so data that cannot be stored in memory is overwritten to disk. Overwriting data to disks reduces application performance. Therefore, it is generally recommended to provide the application with as much memory as it needs. It is recommended to test and evaluate memory usage in a small-scale scenario.

6.2 the GC

Another aspect of memory tuning is garbage collection. For streaming applications, you don’t want to have long delays due to JVM garbage collection if you want low latency. There are a number of parameters that help reduce memory usage and GC overhead:

  1. Persistence of Stream: As mentioned in the data serialization tuning section, intermediate RDD produced by input data and some operations are serialized to bytes by default when persisted. This reduces memory and GC overhead compared to the non-serialized approach. Using the Kryo serialization mechanism can further reduce memory usage and GC overhead.

    To further reduce memory usage, data can be compressed, controlled by the spark.rdd.press parameter (default false), but CPU time increases.

  2. Clean up old data: By default, all input data and persistent RDD generated through the DStream Transformation operation are automatically cleaned up. Spark Streaming will decide when to clean up this data, depending on the type of transformation operation. For example, if you use a window whose window length is less than 10 minutes, Spark will keep the data of less than 10 minutes and clean up the old data after the time expires. However, in some special scenarios, such as Spark SQL and Spark Streaming integration, Spark SQL is used to query batch RDD in asynchronously started threads. You need to let Spark hold the data for a longer time until the Spark SQL query ends. You can use streamingContext. Remember () method to implement.

  3. CMS garbage collector: Uses a parallel Mark-sweep garbage collection mechanism, which is recommended to keep GC overhead low. Although parallel GC reduces throughput, it is recommended to use it to reduce batch processing time (reduce GC overhead during processing). If it is used, it must be enabled on both the driver and Executor sides. Use –driver-java-options in spark-submit; Use spark. Executor. ExtraJavaOptions parameter Settings – XX: + UseConcMarkSweepGC.

  4. RDDs are persisted in out-of-heap memory, and there is no out-of-heap GC

7. Long time operation guarantee

Fault tolerance

  • Example Increase the number of AM & Spark Driver retries

    spark.yarn.maxAppAttempts=4 spark.yarn.am.attemptFailuresValidityInterval=1h

  • Increased the maximum number of Executor failures tolerated

    spark.yarn.max.executor.failures={8*num_executors} spark.yarn.executor.failuresValidityInterval=1h

  • Increased the maximum number of Task failures allowed

    spark.task.maxFailures=8

  • Increase the network waiting time

    spark.rpc.askTimeout=600s spark.network.timeout=600s

7.2 Gracefully stop the Streaming program and Checkpoint Settings

Ensure that data is not lost during abnormal exit or active kill

spark.streaming.stopGracefullyOnShutdown=true spark.sql.streaming.checkpointLocation=hdfs://hdfsCluster/spark_checkpoint/” + appName

7.3 the Performance

  • Enable speculative execution and eliminate tasks that execute slowly (actions are idempotent)

    spark.speculation=true

  • Controls the rate of message-oriented middleware

    spark.streaming.backpressure.enable=true spark.streaming.receiver.maxRate=XXX

Ref

  1. Blog.csdn.net/kwu_ganymed…
  2. www.jianshu.com/p/6d576e818…
  3. Spark Best Practices
  4. Litaotao. Making. IO/boost – spark…