Introduction: Since the launch of Remote Shuffle Service(RSS) in 2020, Aliyun EMR has helped many customers solve the performance and stability problems of Spark jobs, and implemented the storage and computation separation architecture. Meanwhile, RSS has been continuously evolving under the cooperation with Xiaomi. This article will introduce the latest architecture of RSS, practice in Xiaomi, and open source.

The author | a hammer, Ming dhi, purple park source | ali technology to the public

Since the launch of Remote Shuffle Service(RSS) in 2020, Aliyun EMR has helped many customers solve performance and stability problems of Spark jobs, and enabled the implementation of the storage separation architecture. Meanwhile, RSS has been continuously evolving under the cooperation with Xiaomi. This article will introduce the latest architecture of RSS, practice in Xiaomi, and open source.

A review of the issue

Shuffle is the most important operator in big data computing. First, the coverage rate is high, and more than 50% of the jobs contain at least one Shuffle[2]. Secondly, resource consumption is large. The CPU ratio of Shuffle, alibaba’s internal platform, exceeds 20%, while the resource waste caused by Shuffle Read, LinkedIn’s internal platform, is as high as 15%[1], and the data volume of single Shuffle exceeds 100T[2]. Third, instability, hardware resource stability CPU> memory > disk ≈ network, while Shuffle resource consumption is in reverse order. OutOfMemory and Fetch Failure are probably the two most common types of Spark job errors, with the former being resolved by callback and the latter requiring systematic refactoring Shuffle.

Traditional Shuffle As shown in the following figure, Mapper sorts Shuffle data into disks by PartitionId and delivers the disks to the External Shuffle Service(ESS) for management. The Reducer reads its own Block from each Mapper Output.

The traditional Shuffle has the following problems.

  • This site dependency limits the separation of savings and accounts. Memory separation is a new architecture emerging in recent years. It decouples computing and storage, enabling more flexible model design. Compute nodes have strong CPUS and weak disks, and STORAGE nodes have strong disks and weak cpus. Compute nodes are stateless and can be flexibly scaled based on the load. Storage end, with the maturity of object storage (OSS, S3)+ data lake format (Delta, Iceberg, Hudi)+ local/near-ground cache and other schemes, can be used as unlimited storage services. Customers get cost savings by calculating flexibility + charging per volume for storage. However, Shuffle’s reliance on its home turf limits the separation of storage and computation.
  • Write amplification. When the amount of Mapper Output data exceeds the memory, an external drain is triggered, which introduces additional disk I/O.
  • Lots of random reads. Mapper Output has a small amount of data belonging to a Reducer. For example, if Output is 128M and concurrency is 2000, each Reducer has 64K read, resulting in a large number of small-grained random reads. For HDDS, the random read performance is very poor. For SSDS, the SERVICE life of SSDS is rapidly consumed.
  • A high number of network connections causes thread pools to consume too much CPU, resulting in performance and stability problems.
  • Shuffle single copy of data. In large-scale cluster scenarios, faulty disks or faulty nodes are common. Stage recalculation caused by Shuffle data loss causes performance and stability problems.

Ii. Development history of RSS

In view of the problem of Shuffle, the industry has tried various methods, and gradually converges to the scheme of Push Shuffle in recent years.

1 Sailfish

Sailfish3 was the first to propose the data aggregation method of Push Shuffle + Partition, which can improve the performance of large operations by 20% to 5 times. Sailfish has modified the distributed file system KFS[4] to not support multiple copies.

2 Dataflow

Google BigQuery and Cloud Dataflow5 decouple Shuffle from computing and use multiple layers of storage (memory + disk). No further technical details were disclosed.

3 Riffle

Facebook Riffle2 adopts the Merge method on Mapper. The Riffle service deployed on a physical node merges Shuffle data on the node according to the PartitionId, thus merging random reads of small granularity into larger granularity to some extent.

4 Cosco

Facebook Cosco[6]7 adopted and redesigned Sailfish’s method, retaining the core method of Push Shuffle + Parititon data aggregation, but using an independent service. The server adopts master-worker architecture, uses two copies of memory, and uses DFS for persistence. Cosco basically defined the standard architecture for RSS, but it was hampered by DFS without a significant performance improvement.

5 Zeus

Uber Zeus[8]9 also adopts decentralized service architecture, but there is no role similar to ETCD to maintain Worker state, so it is difficult to do state management. Zeus makes multiple copies with Client double-push, using local storage.

6 RPMP

The Intel RPMP10 relies on the new hardware of RDMA and PMEM to speed up the Shuffle and does not do data aggregation.

7 Magnet

LinkedIn Magnet1 combines local Shuffle and Push Shuffle. Its design philosophy is “try your best”. After Mapper’s Output is written locally, the Push thread will Push the data to the remote ESS for aggregation, and there is no guarantee that all data will be aggregated. Benefiting from native Shuffle, Magnet performs better on fault tolerance and AE support (Fallback directly to traditional Shuffle). Magnet’s limitations include its reliance on its own site, which does not support separation of storage and computation; Data merging relies on ESS, placing additional stress on NodeManager; Shuffle Write Indicates that both local and remote data is written at the same time, and the performance is not optimal. The Magnet solution has been adopted by Apache Spark as the default open source solution.

8 FireStorm

FireStorm11 is a hybrid of Cosco and Zeus. The server uses master-worker architecture and achieves multiple copies through Client multi-write. FireStorm uses the local + object store multi-layer storage, using the larger PushBlock(default 3M). FireStorm keeps PushBlock meta information on the storage side and records it in an index file. The memory of FireStorm’s Client cache data is managed by Spark MemoryManager and is allocated with a fine granularity of memory (3K by default) to avoid memory waste.

As can be seen from the above description, the current scheme basically converges to Push Shuffle, but there are different choices in some key designs, mainly reflected in:

  1. Integrated into Spark or an independent service.
  2. RSS service side architecture, options include: master-worker, decentralization with lightweight state management, complete decentralization.
  3. Shuffle data storage, options include: memory, local, DFS, object storage.
  4. Implementation of multiple copies, options include: Client push, server do Replication.

Aliyun RSS12 was launched in 2020. Its core design refers to Sailfish and Cosco, and it has made improvements in architecture and implementation, which will be described in detail below.

Three Ali Cloud RSS core architecture

For the key design of the previous section, the following are the options for THE ALIyun RSS:

  1. Independent service. Considering that integrating RSS into Spark cannot meet the requirements of the storage and computation separation architecture, ALIyun RSS will provide Shuffle service as an independent service.
  2. The Master – Worker architecture. It is necessary to manage service status through the Master node, but the etCD-based state management capability is limited.
  3. Multiple storage methods. Currently support local /DFS and other storage methods, the main site, the future will be to the direction of tiered storage development.
  4. Replication on the server. Multiple Client push consumes extra network and computing resources on compute nodes, which is not friendly to computing clusters in independent deployment or as a service.

The following figure shows the key architecture of ALIyun RSS, including Client(RSS Client, Meta Service), Master(Resource Manager) and Worker. The Shuffle process is as follows:

  1. When the Mapper pushes data for the first time, it requests the Master to allocate Worker resources, and the Worker records the Partition list that it needs to serve.
  2. The Mapper caches Shuffle data to the memory and triggers Push when the Shuffle data exceeds the threshold.
  3. Data belonging to the same Partition is pushed to the same Worker for merging. The master Worker sends Replication to the slave Worker immediately after receiving data. When two copies of data are made in the memory, the master Worker sends ACK to the Client.
  4. After the Mapper Stage runs, MetaService sends the CommitFiles command to the Worker, flush all the data remaining in the memory and return the file list.
  5. Reducer reads Shuffle data from the corresponding file list.

See [13] for the introduction of the core architecture and fault tolerance of ALIyun RSS. Next, this paper introduces the architecture evolution of Aliyun RSS in the past year and the characteristics different from other systems.

1 State sinking

RSS adopts master-worker architecture. In the initial design, the Master is responsible for cluster status management and Shuffle life cycle management. Cluster status includes Worker health and load; The life cycle includes which Worker is served by each Shuffle, Partition list served by each Worker, status of Shuffle (Shuffle Write, CommitFile, Shuffle Read), whether data is lost, etc. A large amount of data and complex data structures are required to maintain the Shuffle life cycle, which impel the implementation of Master HA. At the same time, the large number of life-cycle managed service calls makes Master prone to become a performance bottleneck, limiting the scalability of RSS.

To ease the pressure on the Master, we relegate the lifecycle state management to the Driver, where the Application manages its own Shuffle and the Master maintains only the state of the RSS cluster itself. This optimization greatly reduces the Master load and enables Master HA to be implemented smoothly.

2 Adaptive Pusher

In the initial design, ALI Cloud RSS adopts hash-based Pusher like other systems, that is, the Client will maintain one (or more [11]) memory buffers for each Partition, and push will be triggered when the buffers exceed the threshold. This design works fine with moderate concurrency, and results in OOM with large concurrency. For example, the concurrency of Reducer is 5W, and in the system of small Buffer[13] (64K), the extreme memory consumption is 64K

5W=3G, and in a system with a large Buffer[11] (3M), extreme memory consumption is 3M

5W is 146G. That’s unacceptable. To solve this problem, we developed sort-based Pusher, which does not distinguish Partition when caching data. When the total data exceeds the threshold (i.e. 64M), the current data is sorted by PartitionId, and then the data is pushed Batch later, so as to solve the problem of excessive memory consumption.

Sort-Based Pusher introduces an additional Sort, which is slightly worse than Hash-Based Pusher. In the ShuffleWriter initialization stage, appropriate Pusher was automatically selected according to the Reducer concurrency.

3 Disk fault tolerance

For the sake of performance, Ali Cloud RSS recommended local site storage, so dealing with bad/slow disk is the premise of ensuring service reliability. The DeviceMonitor thread of the Worker node periodically checks the disk, including IOHang, usage, and read/write exceptions. In addition, the Worker will capture and report exceptions at all disk operations (file creation, disk brushing). IOHang and read/write exceptions are considered Critical errors. The disk is isolated and the storage service on the disk is terminated. If a disk is a slow disk or the disk usage exceeds the alarm threshold, the disk is isolated and does not accept new Partition storage requests. However, the existing Partition services remain normal. After the disk is quarantined, the Worker’s capacity and load will change and this information will be sent to the Master via heartbeat.

4 Rolling Upgrade

RSS is a resident service with a never-ending requirement, and the system itself is always evolving, so rolling updates is a mandatory feature. Although sub-cluster deployment can be bypassed, that is, deploy multiple sub-clusters, grayscale the sub-clusters, and suspend the services of the grayscale clusters, this deployment relies on the scheduling system to sense the grayscale clusters and dynamically modify the job configuration. We think RSS should be closed loop with rolling upgrades, and the core design is shown below. The Client sends a rolling upgrade request to the Leader role of the Master node (Master implements HA, see above) and uploads the update package to the Leader. The Leader changes the status to a rolling upgrade using Raft protocol and starts the first phase of the upgrade: upgrading the Master node. The Leader first upgrades all followers, then replaces the local package and restarts. The upgrade process is not interrupted or abnormal in the event that the Leader node changes. After the Master node is upgraded, the Worker node is upgraded. RSS uses a sliding window to upgrade, and workers in the window are offline as gracefully as possible, that is, reject new Partition requests and wait for the local Shuffle to end. To avoid a long wait time, the timeout period is set. In addition, the Worker selection in the window will try to avoid including both master and slave copies to reduce the probability of data loss.

5. Mess up the testing framework

For services, UT, integration test, E2E test and so on alone cannot guarantee service reliability, because these tests cannot cover online complex environment, such as bad disk, CPU overload, network overload, machine down, etc. In order to simulate the online environment, we developed a simulation (chaos) test framework to simulate the possible online anomalies in the test environment, while ensuring the minimum operating environment for RSS operation, that is, at least 3 Master nodes and 2 Worker nodes are available. And each Worker node has at least one disk. We continue to do this kind of stress test with RSS.

Simulation test framework architecture is shown in the figure below. First, test Plan is defined to describe the event type, sequence and duration of event triggering. Event types include node exception, disk exception, IO exception, CPU overload, etc. The client submits the Plan to Scheduler, which sends specific Operation to Runner of each node according to the description of the Plan. Runner is responsible for specific execution and reporting the status of the current node. Before triggering Operation, Scheduler deduces the consequences of the event and rejects the event if it fails to meet the minimum runnable RSS environment.

We believe that the idea of simulation testing framework is a general design, which can be extended to more service testing.

6 Multi-engine support

Shuffle is a universal operation and is not engine bound, so we tried multi-engine support. We currently support Hive+RSS, and are exploring the possibility of combining with streaming computing engine (Flink) and MPP engine (Presto). Although Hive and Spark are both batch computing engines, Shuffle has different behaviors. The biggest difference is that Hive does sequencing on Mapper, while Reducer does sequencing on The Reducer. Since RSS does not support calculation at present, it is necessary to transform Tez to support Reducer ordering. In addition, Spark has a clean Shuffle plug-in interface and RSS only needs to be extended on the periphery, while Tez has no similar abstraction and is somewhat intrusive in this respect.

Most current engines do not have Shuffle plug-in abstractions and require some engine modification. In addition, flow computing and MPP are both upstream Push to downstream modes, while RSS is upstream Push and downstream Pull modes, and the combination of the two also needs to be explored.

7 test

We compared Aliyun RSS, Magent and open source system X. Because your system is still evolving, the test results are only current.

The test environment

Header

1: ecs.g6e.4xlarge, 16

2.5 GHz / 3.2 GHz, 64 gib, 10 GBPS Worker

3: ecs.g6e.8xlarge, 32

2.5 GHz / 3.2 GHz, 128 gib, 10 GBPS

Aliyun RSS vs. Magnet

The 5T Terasort performance test is shown in the figure below. As described above, Magent’s Shuffle Write has additional overhead compared to RSS and traditional practices. Magent’s Shuffle Read improves, but not as much as RSS. At this Benchmark, RSS is significantly better than the other two, and Magent’s E2E time is slightly better than traditional Shuffle.

Aliyun RSS vs. open source system X

The performance comparison between RSS and open source system X on TPCDS-3T is as follows: RSS is 20% faster in total time.

The stability of

In terms of stability, we tested the large-scale concurrent scenario of Reducer, Magnet could run but the time was several times slower than RSS, and System X reported an error in Shuffle Write stage.

Four Ali cloud RSS practice in Millet

1. Current situation and pain points

The offline cluster of Xiaomi uses Yarn and HDFS as the main cluster, and NodeManager and DataNode are deployed together. Spark is a major offline engine that supports core computing tasks. The biggest pain points of Spark are poor stability and performance caused by Shuffle, and limitations on the memory separation architecture. After resource assurance and job tuning, job Failure is mainly attributed to Fetch Failure, as shown in the following figure. As most clusters use HDDS, the high random reads and high network connections of traditional Shuffle lead to poor performance. Stage recalculation caused by low stability further aggravates performance rollback. In addition, Xiaomi has been trying to take advantage of the computing flexibility of a save-computing architecture to lower costs, but Shuffle’s reliance on its home turf has hampered the company.

2 RSS landing in Xiaomi

Xiaomi has been paying attention to Shuffle optimization related technologies, and established co-creation relationship with Ali Cloud EMR team on RSS project in January, 21. In March, the first production cluster was launched and access operation began. In June, the first HA cluster was launched with 100+ nodes in scale, and in September, the first 300+ nodes were launched. Future plans will further expand the GRAY scale of RSS.

In the process of landing, Xiaomi led the development of disk fault tolerance, which greatly improved the service stability of RSS. The technical details are described above. In addition, in the early stage of RSS is not completely stable, Xiaomi in a number of links of RSS operations for fault tolerance. On the scheduling end, if an error occurs due to Shuffle when the Spark job of RSS is enabled, the next Yarn retry is rolled back to THE ESS. In the ShuffleWriter initialization stage, Xiaomi led the adaptive Fallback mechanism, which automatically selected RSS or ESS according to the current RSS cluster load and job characteristics (such as whether Reducer concurrency is too large), thus improving stability.

3 the effect

After adding RSS, Spark job stability and performance are significantly improved. Jobs that had previously failed due to Fetch Failure almost no longer failed, with an average performance improvement of 20%. The following figure shows a comparison of job stability before and after RSS is added.

ESS:

RSS:

The following figure shows a comparison of job run times before and after RSS is added.

ESS:

RSS:

In terms of the separation of storage and computation, an overseas xiaomi cluster connected with RSS successfully launched an elastic cluster with 1600+ Core, and the operation ran stably.

With the joint efforts of aliyun EMR team and Xiaomi Spark team, the stability and performance improvement brought by RSS has been fully verified. In the future, Xiaomi will continue to expand the scale of RSS cluster and operation, and play a greater role in the scenario of elastic resource scaling.

Five open source

Important thing to say three times: “Ali cloud RSS open source!” X 3

Git address: github.com/alibaba/Rem…

Open source code contains core functions and fault tolerance to meet production requirements.

Key features in the plan:

  1. AE
  2. Spark supports multiple versions
  3. Better flow control
  4. Better monitoring
  5. Better HA
  6. Multi-engine support

Welcome all developers to build!

Six Reference

[1]Min Shen, Ye Zhou, Chandni Singh. Magnet: Push-based Shuffle Service for Large-scale Data Processing. VLDB 2020. [2]Haoyu Zhang, Brian Cho, Ergin Seyfe, Avery Ching, Michael J. Freedman. Riffle: Optimized Shuffle Service for Large-Scale Data Analytics. EuroSys 2018. [3]Sriram Rao, Raghu Ramakrishnan, Adam Silberstein. Sailfish: A Framework For Large Scale Data Processing. SoCC 2012. [4]KFS. code.google.com/p/kosmosfs/ [5]Google Dataflow Shuffle. Cloud.google.com/blog/produc… [6] says there: An Efficient Facebook – Scale Shuffle Service. Databricks.com/session/cos… [7]Flash for Apache Spark Shuffle with Cosco. databricks.com/session\_na… [8] Uber Zeus. Databricks.com/session\_na… [9] Uber Zeus. Github.com/uber/Remote… . [10] Intel RPMP databricks.com/session\_na… [11] Tencent FireStorm. Github.com/Tencent/Fir… [12] Aliyun RSS in interesting headline practice. Developer.aliyun.com/article/779… [13] Aliyun RSS architecture. Developer.aliyun.com/article/772…

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.