Image credit: Pexels

background

Shuffle is a data redistribution process used by the distributed computing framework to connect upstream and downstream tasks. In distributed computing, all processes involving upstream and downstream tasks can be referred to as Shuffle. Shuffle can be implemented in different distributed frameworks:

  1. File-based pull shuffle, such as MapReduce and Spark. This shuffle mode is mostly used for MR-like frameworks, such as MapReduce and Spark. It has high fault tolerance and is suitable for large-scale batch processing jobs. Because the file-based shuffle scheme is implemented, only the failed task and stage need to be rerun instead of the entire job.
  2. Pipe-based push shuffle, such as Flink and Storm. The implementation mode of pipeline based Push shuffle is mainly used in streaming frameworks such as Flink and Storm, or some MPP frameworks such as Presto and Greenplum. It is characterized by low latency and high performance. However, the biggest problem is that shuffle data is not persisted. Therefore, the failure of the task will lead to the rerunning of the entire task.

Shuffle is the most important link in distributed framework. The performance and stability of Shuffle directly affect the performance and stability of the whole framework. Therefore, it is necessary to improve the Shuffle framework.

Business pain points

Challenges of Spark in the cloud native scenario

The Shuffle mode based on local disks limits Spark’s use in cloud native, storage and computing separated, and offline environments:

  1. In the cloud native environment, serverless is a goal of service deployment. However, due to the occurrence of elasticity or preemption, it is normal for nodes or containers to be preempted and executors to be killed. The existing shuffle cannot make computing serverless. Shuffle data is often recalculated when nodes/containers are preempted, which is costly.
  2. Online clusters usually have a small number of local disks and a large number of CPU cores, so the computing and I/O are unbalanced, and it is very easy to fill disks when scheduling jobs based on computational power in such clusters.
  3. Nowadays, more and more DATA center architectures adopt the storage and computing separation deployment mode. In this deployment mode, shuffle based on local disks may encounter the problem that shuffle data cannot be stored due to insufficient local disks. Second, block storage (RBD) can be used to solve local storage problems. However, block storage in I/O mode such as Shuffle incurs huge network overhead and performance problems.

Challenges of Spark in a production environment

Most batch jobs on the distributed computing platform are Spark jobs, and a few are MR jobs. Compared with MR jobs, Spark jobs are less stable, and at least half of the stability problems are caused by shuffle failure.

Shuffle A Shuffle failure causes the task to be retry, which seriously slows down the operation. If the Shuffle FETCH fails, the Map task will be reperformed to generate Shuffle data, and then the Reduce task will be reperformed. If the Reduce task fails repeatedly, the Map task will need to be reperformed repeatedly. In the case of high cluster pressure, rerunning costs a lot and seriously affects the operation.

Shao Zheng has a comment on Spark-1529 at the following address:

Issues.apache.org/jira/browse…

It is very difficult to run smoothly in the operation of super-large shuffle data (shuffle quantity above T level). The problems are as follows:

  1. Shuffle Data easily fills the disk. This problem can only be avoided by repeated adjustments and retries to spread executors to as many nodes as possible (anti-affinity).
  2. A large number of Shuffle partitions result in a large number of Shuffle connections, making the Shuffle framework extremely prone to timeout problems and problems caused by very high random access I/OS.

Shuffle based on local disks has serious write magnification problems and random I/o problems. When the number of tasks reaches 10 or more than 100K, random I/o problems become serious, which seriously affects the performance and stability of the cluster.

Therefore, it is particularly important to implement a better Shuffle framework that can solve the above business pain points.

The industry trend

Shuffle [1] has also been explored in the industry for many years, and corresponding capabilities have been built based on their respective business scenarios. Here is a list of the work done by mainstream companies on Shuffle.

Baidu DCE shuffle

Baidu DCE Shuffle is the remote Shuffle service solution that has been practiced and widely used in the industry earlier. It is designed to solve several problems: first, in offline mixing, and second, to improve the stability and processing scale of MR operations. Baidu’s internal MR jobs have been modified to use DCE Shuffle for many years, and Spark batch jobs have also been modified to use DCE Shuffle as their Shuffle engine.

Facebook Cosco Shuffle[2]

The original design of Facebook Cosco Shuffle is very similar to That of Baidu. The construction of Facebook data center is storage and computing separation, so the traditional Shuffle method based on local files has a large cost. Meanwhile, the largest Shuffle scale in Facebook is 100T. This poses great challenges to shuffle, so Facebook implements the Remote Shuffle Service – Cosco Shuffle based on HDFS.

Google Dataflow Shuffle[3]

Google Dataflow Shuffle is Google’s Shuffle service on Google Cloud. In view of the flexible and volatile environment on the Cloud, Google develops a set of Dataflow Shuffle service for the big data service of Google Cloud. Dataflow Shuffle is also a set of Remote Shuffle service, which moves Shuffle storage outside the VM and provides greater flexibility in computing operations.

Uber Zeus[4]

In order to solve the Shuffle pain points mentioned above, Uber also implemented Zeus, the Remote Shuffle Service, which has been open source. According to the design documents and implementation, multiple Shuffle Servers are deployed to receive and aggregate Shuffle data, and SSDS are used as storage media to improve Shuffle performance.

Ali ESS [5]

Alibaba’s EMR Remote Shuffle Service (ESS) is designed to solve the computing and storage separation problems faced by Spark on Kubernetes and enable Spark to adapt to the cloud native environment.

The business value

The implementation of Remote Shuffle Service brings the following Service values:

  • ** Cloud native architecture support: ** Existing distributed computing frameworks (e.g. Spark relies on local disks to store Shuffle data) greatly limit the cloud native deployment mode. The Remote Shuffle Service effectively reduces dependence on local disks and supports multiple cluster deployment modes, improving resource utilization and facilitating the native cloud architecture.
  • ** Improve the stability of Spark job Shuffle: ** If the shuffle data volume reaches TB or even 10TB, such tasks will exert great pressure on disk space. Meanwhile, large Task data also causes pressure on network access, resulting in a high failure rate. Remote Shuffle Service can better solve these problems. To enable the business to run smoothly.

Firestorm introduction

The target

There are millions of Spark tasks running inside Tencent every day, and Shuffle problems like these are common. Meanwhile, to better utilize hardware resources, the deployment mode of computing and storage separation is gradually advanced. So we developed Firestorm with the following objectives:

  • Support large Shuffle tasks (e.g., TeraSort 40T+)
  • Support cloud native deployment mode (for example, computing and storage separated deployment mode)
  • Supports multiple storage systems (LocalFile, HDFS, COS, etc.)
  • Data integrity verification is supported
  • Performance is close to the native solution of the computing engine

Architectural Design Scheme

The architecture of the Remote Shuffle Service is as follows:

The functions of each component are as follows:

  • Coordinator manages the Shuffle Server based on the heartbeat mechanism, stores metadata such as resource usage of the Shuffle Server, and is responsible for task assignment. Based on the Shuffle Server load, Allocate a proper Shuffle Server to the Spark application to process data on different partitions.
  • Shuffle Server: receives Shuffle data, aggregates it, and writes it to the storage. Based on different storage modes, it can also read Shuffle data (for example, LocalFile storage mode).
  • A Shuffle Client communicates with coordinators and Shuffle Servers, sends read and write requests to Shuffle data, and maintains the heartbeat between applications and Coordinators.
  • During the interaction between the Shuffle Server and the Storage, the Storage Handler component is decoupled from the Shuffle Server. This component can flexibly access different Storage devices to meet Storage requirements.

Architectural design differences

Firestorm is unique compared to other solutions in the industry:

  • In terms of architecture, a Coordinator component can be introduced to better manage the Shuffle Server and properly allocate Shuffle tasks based on the status of the Shuffle Server node. The cluster also supports flexible horizontal expansion to meet production requirements
  • In terms of technology, storage modules are decoupled, and only relevant interfaces are required to support the new Shuffle data storage mode. As the most important data verification part of the whole system, in addition to CRC and data deduplication, it also adds read and write data consistency verification, which makes data more secure and reliable in the process of data transmission.
  • In terms of operation, Firestorm provides various operation statistics and access to the internal monitoring platform, so as to observe the overall status of the cluster, understand performance bottlenecks, and receive alarm information in case of exceptions.

The whole process

The Shuffle process based on Firestorm is as follows:

  1. The Driver obtains assignment information from a Coordinator
  2. The Driver registers Shuffle information with Shuffle Server
  3. Based on allocation information, Executor sends Shuffle data as blocks to Shuffle Server
  4. Shuffle Server writes data to storage
  5. After the write task is complete, Executor updates the result to Drive
  6. The read Task obtains successful Task writing information from the Driver
  7. Read tasks get Shuffle metadata from Shuffle Server (e.g., all blockId)
  8. Based on the storage mode, the read task reads Shuffle data from the storage device

Writing process

When writing Shuffle data, consider memory usage, asynchronous file writing, and Shuffle data merging. The process is as follows:

  1. Task sends data to the corresponding Buffer based on the PartitionId
  2. When the Buffer reaches its threshold, the Buffer’s data is sent to the data queue
  3. Continuously fetching data from the data queue and submitting it to the sending thread
  4. The sending line requests memory space from the Shuffle Server and then sends data to the Shuffle Server buffer.
  5. When the Shuffle Server buffer reaches the threshold, Shuffle data is sent to the write queue
  6. Constantly fetching data from the write queue and submitting it to the write thread
  7. Obtain the storage path based on Shuffle Data information (ApplicationId, ShuffleId, and PartitionId) and write Shuffle Data to the Index file and Data file
  8. After the Task is written, the Shuffle Server is informed that the Task is complete and obtains the number of completed tasks. If the number of completed tasks is less than the expected value, the Shuffle Server goes to the next step. If the number of completed tasks is greater than the expected value, the Shuffle Server sends a message to write the information related to the buffer into the storage and waits for the writing result. Proceed to the next step after success
  9. After the Task is completed, the TaskId is recorded in MapStatus and sent to the Driver. This step is used to support Spark’s execution prediction function

Reading process

When reading Shuffle data, data integrity is mainly considered. The specific process is as follows:

  1. Get all successful taskids in the Write phase from the Driver
  2. Read shuffle Data, first read Index file, check whether BlockId exists, based on the Offset information of Index file, then read Data file, obtain shuffle Data. · If the Storage is HDFS, · If the Storage is a Local File, Shuffle Server is used to read the File

Shuffle files

Shuffle Data is stored as an Index file and a Data file. Shuffle Data is stored in the Data file as a Block, and the Index file stores metadata of each Block.

  • BlockId: The unique identifier of each Block, of type long, with the first 19 bits as autoincrement Int, the middle 20 bits as PartitionId, and the last 24 bits as TaskId
  • Offset: The Offset of a Block in a Data file
  • Crc: The Crc check value of a Block. This value is calculated when a Block is generated and is eventually stored in the Index file, which is used to verify data integrity when a Block is read
  • CompressLength: Block Indicates the length of data after compression
  • UnCompressLength: Block length of uncompressed data, used to improve decompression efficiency during reads
  • TaskId: Used to filter invalid block data

Data validation

Data correctness is critical to Shuffle. Firestorm ensures data correctness as follows:

  1. The write task calculates the CRC check value for each Block, and the read task performs the CRC check on each Block to avoid data inconsistency
  2. Each BlockId is stored on the Shuffle Server side, and when reading data, it verifies that all BlockId has been processed to avoid data loss
  3. The Driver records successful Task information and filters out redundant blocks to avoid data inconsistency caused by execution prediction

Support multiple storage

Because there are many choices of storage, such as LocalFile, HDFS, OZONE, COS, etc., in order to facilitate access to all kinds of storage, the storage is decoupled in the design and the read and write interface is abstracted. You only need to implement related interfaces for different storage devices, which can be used as back-end storage for Shuffle data.

Firestorm earnings

Deployment patterns that support cloud native

At present, Firestorm has landed in the offline mixing cluster of nearly 10,000 nodes in Tencent. It supports nearly 5W distributed computing jobs every day, and the Shuffle data volume is close to 2PB every day. The task failure rate has been reduced from 14% to 9%, which has reached the first-stage goal set at the beginning and helped distributed computing to go into the cloud.

Improve the stability and performance of Shuffle phase

Based on tPC-DS 1TB data volume, we conducted a performance comparison test between using native Spark Shuffle and Firestorm. The test environment is as follows:

  • Three servers serve as compute nodes with 80 cores, 256 GB, and HDDS
  • The three servers function as Shuffle servers and store Shuffle data with 112core, 128 gb, and HDDS

The COMPLEXITY of TPC-DS SQL varies. For simple SQL, the original Spark Shuffle performs better due to the small amount of Shuffle data, but the performance advantage is not obvious. For complex SQL, the Shuffle process involves a large number of partitions. Firestorm is more stable and its performance is greatly improved. These two scenarios are described below:

** Scenario 1, ** Simple SQL, using Query43 as an example, the following is the Stage diagram of Query43, consisting of two stages, shuffle data volume is very small, using the original Spark shuffle the whole query execution takes about 12 seconds. Using the Remote Shuffle Service takes about 15 seconds.

So where is the time wasted? The figure below shows the relevant time of the first stage. It can be seen that in the statistics of write Time column, the original Spark Shuffle has a performance advantage, and the time is in the millisecond level. However, Firestorm adds RPC communication during Shuffle write stage. As a result, the time is increased, and the number of tasks needs to be run in multiple batches, resulting in a difference of hundreds of milliseconds in each batch. As a result, the original Spark Shuffle has a performance advantage of about 3 seconds in this query.

As the execution time of SQL increases, this performance advantage diminishes and is almost negligible. SQL in this category includes Query1, Query3, and so on, which are not listed here.

** Scenario 2, ** complex SQL. Taking Query17 as an example, the following figure shows the stages diagram using different shuffle modes. It can be seen from the figure that this SQL has a large number of stages and shuffle data, and the execution time using the original Spark shuffle is about 8 minutes. The Remote Shuffle Service takes only about 3 minutes.

The original Spark Shuffle needs to pull data from each Executor, which involves a lot of network overhead and random DISK I/O. The time is very long, even up to 2 minutes. The Remote Shuffle Service, on the other hand, reduces the network overhead and reads the entire Shuffle data, so it is time-consuming and stable.

The original Spark Shuffle still takes a long time and is unstable. This is mainly because the compute node processes both Shuffle Read and Shuffle Write at this time, requiring frequent access to the local disk and large data volume. As a result, the time consumption increases greatly. The Remote Shuffle Service avoids such problems in the read and write mechanism, so the overall performance is greatly improved and more stable.

This type of SQL also has Query25, Query29 and so on, here is not one example.

In addition to the above two scenarios, there are some queries that cannot run properly using the original Spark Shuffle due to the large amount of shuffle data, but can run smoothly using the Remote Shuffle Service, such as Query64 and Query67.

In general, in the scenario with a small amount of Shuffle data, Remote Shuffle Service has no advantage over the original Spark Shuffle, and the performance is slightly decreased by 5%-10% or basically flat. In the scenario with a large amount of Shuffle data, Remote Shuffle Service has obvious advantages. Some SQL test results based on TPC-DS show that the performance is improved by 50%-100%.

conclusion

This paper introduces various problems existing Spark Shuffle implementation and solutions in the industry, and introduces the architecture, design, performance and application of Firestorm developed by us in combination with the actual operation of Spark task in Tencent. Hopefully Firestorm will be able to better assist distributed computing engines in cloud native scenarios.

Open Source version address:

Github.com/Tencent/Fir…

Welcome everyone to pay attention to star, and welcome all excellent developers to join Tencent big data research and development team.

The appendix

[1] issues.apache.org/jira/browse…

[2] www.slideshare.net/databricks/…

[3] cloud.google.com/dataflow/do…

[4] github.com/uber/Remote…

[5] developer.aliyun.com/article/772…

[6] www.sohu.com/a/447193430…