This article is an excerpt from the Bytedance Infrastructure Practices series.

The “Bytedance Infrastructure Practices” series is a series of articles by technical teams and experts of bytedance infrastructure department. They share the practical experiences and lessons of the team in the development and evolution of the infrastructure, and exchange and grow with technical students.

As a distributed storage system with the largest internal storage capacity and cluster scale by Bytedance, HDFS has been developing rapidly with the rapid expansion of bytedance’s key services. This article will start with the HDFS development process, and introduce the major challenges and solutions along the path.

HDFS profile

Since HDFS is a system that has existed for a very long time and its application scenarios are very mature, we will briefly introduce this part.

The HDFS is the most widely used open source Distributed File System in the industry. The principles and architecture are basically the same as Google’s GFS. Its main features are as follows:

  • The same directory tree view as the local file system
  • Append Only write (random write not supported)
  • Sequential and random reads
  • Very large data scale
  • Easy to expand, high fault tolerance rate

Bytedance features HDFS

Bytedance has been using HDFS for a long time. After 7 years of development, it has directly supported more than a dozen data platforms and indirectly supported hundreds of services. In terms of cluster size and data volume, THE HDFS platform has grown into a large platform with tens of thousands of servers in the company, supporting EB level of data volume.

Before diving into the technical details, let’s take a look at Bytedance’s HDFS architecture.

Architecture is introduced

Access layer

The access layer is the biggest difference from the community version, and it is not defined in the community version. In the practice of bytedance, because the nodes of the cluster are too large, we need a lot of Namenodes to implement the federated mechanism to access the data services of different upper-layer businesses. However, when the number of Namenodes becomes too large, unified access and unified view management of user requests can become very problematic. In order to solve the problem of decentralized user access, we need an independent access layer to support the unified access of user requests, forwarding and routing; At the same time, it can combine services to provide user rights and traffic control capabilities. In addition, the access layer also needs to provide a unified view of the external directory tree.

In terms of deployment mode, the access layer relies on external components such as Redis, MySQL, etc., and will consist of stateless NNProxies that provide request routing, Quota limiting, Tracing, and traffic limiting capabilities.

Metadata layer

The main modules in this layer are Name Node, ZKFC, and BookKeeper (unlike QJM, BookKeeper is more reliable for large-scale multi-node data synchronization).

The Name Node is responsible for storing metadata information of the entire HDFS cluster and is the brain of the entire system. Once a failure occurs, the entire cluster becomes unavailable. Therefore, the Name Node has a high availability scheme based on zKFc-based hot standby.

Name Node also faces the problem of scalability, and the carrying capacity of a single Node is always limited. Therefore, the HDFS introduces the Federation mechanism. Multiple groups of Name nodes can be deployed in a cluster. They maintain their own metadata independently and share Data Node storage resources. In this way, an HDFS cluster can scale indefinitely. However, with Federation, each group of Name nodes is separated from each other. Then there are some solutions that allow the entire Federation cluster to provide a view of the entire directory tree.

The data layer

Compared with the metadata layer, the primary Node of the Data layer is the Data Node. The Data Node stores and reads Data. User files are divided into blocks and replicated into multiple copies. Each copy is stored on different Data nodes to achieve fault tolerance and DISASTER recovery. Each copy is stored as a file on the Data Node, and the meta information is loaded into memory at startup.

The Data Node will periodically report its heartbeat to the Name Node and its stored copy information to the Name Node. This process is done independently for each cluster in Federation. In the return result of heartbeat report, the command sent by Name Node to Data Node will be carried, for example, a copy needs to be copied to another Data Node or a copy needs to be deleted.

The main business

First, take a look at the main services carried by the HDFS in Bytedance:

  • Hive, HBase, log service, and Kafka data storage
  • Yarn and Flink computing framework platform data
  • Spark and MapReduce computation-related data stores

Stage of development

In Bytedance, with the rapid development of services, the data volume and cluster size of HDFS rapidly expand. The original NUMBER of HDFS clusters grows from hundreds to thousands and thousands. In the process, numerous pits have been stepped, and the large stages can be summed up into several stages like this.

The first stage

In the early stage of service growth, the cluster size grows steeples. The size of a single cluster soon encounters a bottleneck on the metadata server Name Node. Introduce Federation mechanism to achieve the horizontal scale of the cluster.

Federation brings about the problem of unified namespaces. Therefore, a unified view space is needed to help services build unified access. To solve this problem, we introduced the Name Node Proxy component to implement functions such as unified view and multi-tenant management, which are described in the NNProxy section below.

The second stage

As the data volume continues to increase, the directory tree management in Federation mode also has bottlenecks. As the data volume increases, THE Java GC becomes more frequent, the cost of migrating nodes across subtrees is too high, and the node startup time is too long. Therefore, we solved GC, lock optimization, startup acceleration and other problems through reconstruction, and further improved the service capability of the original Name Node. Holds more metadata information. To address this issue, we have also implemented bytedance’s featured DanceNN component, which is compatible with all the functionality of the original Java version of NameNode and greatly improves stability and performance. Details are provided in the DanceNN section below.

The third stage

When the data volume exceeds EB and the cluster scale expands to tens of thousands of units, the problems of slow nodes, more fine-grained service tier, cost and metadata bottleneck are further highlighted. In terms of architecture, we further evolve in the direction of improving multi-tenant architecture, reconstructing data nodes and hierarchical metadata. This part is currently under way, because the optimization points will be very many, this paper will give the practice of slow node optimization.

The key to improve

During the evolution of the architecture, we did a lot of exploration and experimentation. As mentioned above, in combination with several big challenges and problems mentioned before, we will introduce the two key components, Name Node Proxy and Dance Name Node, as well as our optimization and improvement in slow nodes.

NNProxy (Name Node Proxy)

As the metadata access terminal of the system, NNProxy provides a unified metadata view in federated mode, solving the problems of unified forwarding of user requests and unified management and control of service traffic.

Let’s first introduce the upstream and downstream systems where NNProxy operates.

Let’s first take a look at what NNProxy does.

Routing management

As mentioned in the introduction to Federation above, each cluster maintains its own directory tree and does not provide a complete view of the directory tree. Route management in NNProxy solves this problem. The RM stores a mount table, which records the mapping between several paths and clusters.

For example, /user -> HDFS ://namenodeB. This mapping relationship means that /user and its subdirectories are in the namenodeB cluster. All access to /user and its subdirectories is forwarded by NNProxy to namenodeB. The result is returned to the Client.

/user/tiger/dump -> HDFS ://namenodeC; /user/tiger/dump -> HDFS ://namenodeC; The other subdirectories in the /user directory are on namenodeB. As shown in the figure below:

Quota restrictions

Those of you who have used HDFS will know the concept of Quota. Each directory set is assigned a rated space resource, and writes are prohibited if usage exceeds this threshold. This is done by NNProxy. NNProxy monitors the system’s Quota usage in real time to obtain the latest Quota usage. When a user performs metadata operations, NNProxy determines whether to approve or reject the operation based on the Quota status.

Trace support

ByteTrace is a Trace system that records and tracks user and system and calls between systems for analysis and operation and maintenance purposes. The Trace information is attached to the request RPC to NNProxy. When NNProxy obtains ByteTrace, it can know the upstream module, USER, and Application ID of the current request. NNProxy sends this information to Kafka for some offline analysis, and aggregates and dots it in real time to track online traffic.

Traffic restrictions

Although NNProxy is very lightweight and can withstand high QPS, the back-end Name Node carrying capacity is limited. Therefore, when all read and write requests with high QPS are forwarded to the Name Node due to sudden large jobs, the Name Node will be overloaded, the delay will become high, and even OOM will appear, affecting all users in the cluster. Therefore, another important task for NNProxy is to limit traffic to protect back-end Name nodes. Currently, traffic limiting is based on the path +RPC and user +RPC dimensions. For example, we can limit create requests for /user/tiger/warhouse to 100 QPS, or delete requests for a user to 5 QPS. Once the number of visits exceeds the threshold, NNProxy returns a retry exception, and the Client will retry after receiving this exception. Therefore, paths or users that are restricted will feel the HDFS access is slow, but will not fail.

Dance NN (Dance Name Node)

Problem solved

As mentioned above, after the data volume reaches EB level, the original Java version of Name Node has a lot of online problems to be solved. The following is a summary of some of the problems we encountered in practice:

  • Java version Name Node is developed with Java language. When the INode is over 100 million, it will inevitably bring serious GC problems.
  • Java version Name Node completely places the INode meta information in memory. One billion inodes occupy about 800GB of memory (including the part of native Memory occupied by the JVM itself), further aggravating GC.
  • In our current cluster scale, it takes 6 hours for Name Node to restart and restore services. When the active and standby nodes are faulty at the same time, upper-layer services are seriously affected.
  • Java version Name Node has a global read/write lock. Any modification to the directory tree will block all other read/write operations with low concurrency.

As can be seen from the above, in the case of large data volume, we are in urgent need of a Name Node with a new architecture version to carry our massive metadata. In addition to the C++ language rewrite to avoid Java GC problems, we have made special optimizations for some scenarios.

Directory tree lock design

The HDFS is a distributed cluster internally and a unified file system externally. Therefore, you need to operate files and directories as if you were operating a Linux local file system. This requires HDFS to have the same atomicity, consistency, isolation, and persistence as ACID in database systems. Therefore, DanceNN needs to ensure that the ACID properties are not damaged when multiple users operate on the same file or directory at the same time, and the operation needs to be protected by locks.

Unlike traditional KV storage and database table structures, DanceNN maintains a tree-like data structure, so simple key locks or row locks are not applicable under DanceNN. Adding a single lock to the tree, like a database table lock or a native NN, can seriously affect the overall throughput and latency, so DanceNN redesigned the tree lock structure to ensure that the read throughput is up to 8W and the write throughput is up to 2W with ACID. More than 10 times the performance of a native NN.

Here, we will reclassify RPCS, such as createFile, getFileInfo, and setXAttr, which are still simply CURD an INode; Like Delete RPC, it can either delete a file or a directory, which affects all files under the subtree. A more complex type of operation, like rename RPC, may involve multiple inodes, or even all inodes under multiple subtrees.

DanceNN startup optimization

Since our DanceNN underlying metadata implements a local directory tree management structure, our DanceNN startup optimizations have been built around this design.

Multithreading scans and populates the BlockMap

During system startup, the first step is to read the information stored in the directory tree and fill in the BlockMap, similar to how the Java NN reads FSImage. In the process of implementation, the static directory tree structure is scanned by multiple threads in parallel. Put the result of the scan into a locked Buffer. When the number of elements in the Buffer reaches the specified number, a new Buffer is generated to receive the request, and a thread is created on the old Buffer to fill the BlockMap with data.

The receiving block report is optimized

After DanceNN starts, it enters safe mode to receive block reports from all Date nodes and improve information saved in the BlockMap. When the number of reported Date nodes reaches a certain proportion, the system exits the safe mode and receives requests from clients. Therefore, the speed of receiving block reports also affects the start time of a Date Node. DanceNN has done an optimization here that allocates requests to different threads based on BlockID. Each thread is responsible for a fixed Slice and there is no competition between threads, which greatly speeds up receiving block reports. As shown in the figure below:

Slow node optimization

The problem of slow nodes exists in many distributed systems. The cause is usually the hotspot of upper-layer services or the fault of lower-layer resources. Upper-layer service hotspots cause some data to be accessed centrally in a short period of time. If the underlying resource is faulty, such as a slow disk or damaged disk, more requests will be concentrated on a replica node, resulting in a slow node.

Generally speaking, the optimization of the slow node problem has a great relationship with the upper business demand and the amount of resources at the bottom. In extreme cases, when the upper layer requests are small and the lower layer resources are sufficiently rich, the slow node problem will be very few; otherwise, it will become very serious. In a Bytedance HDFS cluster, slow node problems are very serious, especially when the disk usage is very high. The root cause is that the balance of resources lags behind, and the disk usage of many machines has reached the red line leading to write degradation; New hot resources are concentrated on a small number of machines. In this case, when the number of requests per second of upper-layer services increases, some big data analysis and query services that require high latency of P999 tend to have a large number of data access requests (>10000 requests) stuck in the processing of a slow request.

Our optimization direction will be divided into two aspects: read slow node and write slow node.

Optimize slow read nodes

We went through several stages:

  • At the earliest, the community version is used, whose Switch Read statistics unit is the duration of reading a packet. When the time of reading a packet exceeds the threshold, reading the current packet is considered to have timed out. If there are too many timeout packets in a certain time window, the current node is considered to be a slow node. However, the problem is that using packet as the statistical unit makes the algorithm less sensitive. This makes the accumulated packet cause problems for small IO scenarios (some bytedance services typically use a large number of random small IO) every time a slow node is read.
  • Next, we developed a Hedged Read optimization. Hedged Read sets a timeout for each Read. If the read times out, then another thread will be opened, and the new thread will initiate a read request to the second copy, and finally take the response returned by the first and second copies as the result of reading. But in this case, when it happens in a slow node set, it can cause read traffic amplification. In severe cases, a small range of bandwidth may be unavailable for a short period of time.
  • Based on the previous experience, we further optimize by enabling the optimization of Fast Switch Read, which uses throughput as the criterion to judge the slow node. When the throughput within a period of time window is less than the threshold, the current node is considered to be the slow node. And according to the current reading status dynamically adjust the threshold, dynamically change the length of the time window and throughput threshold size. The following table is the value of the online business test at that time:
Host:X.X.X.X 3 Copy Switch Read 2 copy Hedged Read Hedged Read 3 copy Fast Switch Read (optimized algorithm)
Read duration P999 977 ms 549 ms 192 ms 128 ms
Maximum read time 300 s 125 s 60 s 15.5 s
Occurrence of long tail (> 500ms) 238 times/day 75 times/day 15 times/day 3 times/day
Occurrence of long tail (> 1000ms) 196 times/day 64 times/day 6 times per day 3 times/day

Further relevant test data:

Write slow node optimization

Write slow node optimization is relatively simple. The main solution is to write process, Pipeline intermediate nodes slow situation. To solve this problem, we also developed Fast Failover and Fast Failover+ algorithms.

Fast Failover

When the number of timeout ACK packets exceeds the threshold, the system terminates the current block and applies to namenode for new blocks to continue writing data.

The problem with Fast Failover is that terminating the current block will increase the number of small blocks in the system, negatively affecting the read speed and metadata maintenance of the Namenode. Therefore, Fast Failover maintains a switchover threshold. If the amount of written data (block size) exceeds the threshold, block switchover is performed.

Reaching this threshold, however, often results in a delay that is difficult for the user to receive, so additional optimizations are required when the amount of data falls below the threshold.

Fast Failover+

In order to solve the above problems, when the amount of data written (block size) is less than the threshold, we introduce a new optimization method – Fast Failover+. The algorithm firstly selects datanodes with slow speed from the pipeline, removes the slow nodes from the current pipeline, and enters the Recovery stage of pipeline. Pipeline Recovery applies to Namenode for a new Datanode and forms a new Pipeline with the remaining Datanodes. Then, the data that has been written is synchronized to the new Datanode (this step is called a Transfer block). Since the amount of data that has been written is small, the transfer block does not take much time. The average time of P999 is only 150ms. The additional consumption caused by Pipeline Recovery is acceptable.

The following table is the value of the online business test at that time:

Host:X.X.X.X Fast Failover p99 Fast Failover+ p99 (Optimized algorithm) Fast Failover p95 Fast Failover+ p95 (Optimized algorithm)
Mean Flush duration 1.49 s 1.23 s 182 ms 147 ms
Maximum Flush time 80 s 66 s 9.7 s 6.5 s
Number of long tails (p99 > 10s, P95 > 1s) 63 times/day 38 times/day 94 times/day 55 times/day
Number of long tail occurrences (p99 > 5s, P95 > 0.5s) 133 times/day 101 times/day 173 times/day 156 times/day

Some further practical comparisons:

At the end

HDFS has a long history in Bytedance. We have experienced 7 years of development from the initial hundreds of clusters supporting pB-level data volume to the current tens of thousands of multi-cluster platforms supporting EB-level data volume. With the rapid increase of business, our team has also experienced a savage eruption, scale development and platform operation stage. In this process, we stepped on a lot of holes, but also accumulated quite rich experience. Of course, the most important thing is that the company continues to grow at a high speed, and we remain true to our original purpose, sticking to “DAY ONE” and on the road.



Bytedance infrastructure team

Bytedance’s infrastructure team is an important team supporting the smooth operation of multiple products with a scale of 100 million users, including Douyin, Jinri Toutiao, Watermelon Video and Huoshan Mini Video, which provides guarantee and impetus for the rapid and stable development of Bytedance and its business.

Within the company, the infrastructure team is mainly responsible for the construction of bytedance private cloud, the management of tens of thousands of server-scale clusters, the deployment of tens of thousands of computing/storage hybrid and online/offline hybrid deployment, and the stable storage of several EB massive data.

Culturally, the team actively embraces open source and innovative software and hardware architecture. For details, please refer to job.bytedance.com. If you are interested, please contact [email protected].


Welcome to “Bytedance Technical Team”