background

Powered by AI technology, BIGO’s video-based products and services, including BIGO Live (Live streaming) and Likee (short videos), are widely popular and have users in more than 150 countries/regions. Bigo Live emerged in more than 150 countries, Likee has more than 100 million users and is popular among Gen Z.

With the rapid growth of business, the scale of data carried by BIGO message queue platform has doubled and doubled. Downstream online model training, online recommendation, real-time data analysis, real-time data warehouse and other services put forward higher requirements for real-time and stability of messages.

BIGO message queue platform is the use of open source Kafka, however, with the doubling of business data volume, real-time message and system stability requirements continue to improve, multiple Kafka cluster maintenance costs are becoming higher and higher, mainly reflected in:

  • Data stores are bound to message queue services, and a large number of data copies are required for cluster capacity expansion or partition balancing, resulting in cluster performance deterioration
  • When a partition copy is not in ISR (synchronous) state, the failure of a broker may result in loss of data or the partition being unable to provide read and write services
  • Manual intervention is required when Kafka Broker disk failure/usage is too high
  • The cross-region synchronization of the cluster uses Kafka Mirror Maker (KMM), but the performance and stability are not as expected
  • In catch-up read scenarios, PageCache pollution is likely to occur, resulting in read and write performance deterioration
  • Kafka’s topic partitions are written sequentially, but when there are hundreds or thousands of topic partitions on the broker, it becomes random from a disk perspective, and disk read/write performance decreases as the number of topic partitions increases. Therefore, the number of topic partitions stored on Kafka Broker is limited
  • With the growth of Kafka cluster size, the operation and maintenance cost of Kakfa cluster increases rapidly, requiring a large amount of manpower for daily operation and maintenance. In BIGO, it takes 0.5 people/day to add a machine to a Kafka cluster and partition balancing. Scaling down one machine requires 1 person/day

In order to improve the real-time performance, stability and reliability of message queues and reduce the operation and maintenance costs, we reconsidered the deficiencies of Kafka architecture design and investigated whether the architecture design can solve these problems and meet the current business requirements.

Next generation message flow platform: Pulsar

Apache Pulsar is a top-level project of the Apache Software Foundation. It is the next generation cloud native distributed message flow platform that integrates messaging, storage, and lightweight functional computing. Pulsar was opened source by Yahoo in 2016 and donated to the Apache Software Foundation for incubation, and became an Apache Software Foundation Top project in 2018.

Pulsar adopts a layered architecture that separates computing and storage, supports multi-tenant, persistent storage, and multi-room cross-region data replication, and features high consistency, high throughput, and low latency for scalable streaming data storage.

The main features that attracted us to Pulsar are as follows:

  • Linear scaling: Seamless scaling to hundreds or thousands of nodes
  • High throughput: Already tested in Yahoo’s production environment with pub-sub support for millions of messages per second
  • Low latency: Low latency (less than 5 ms) under large message volumes
  • Persistence mechanism: Plusar’s persistence mechanism is built on Apache BookKeeper and provides read/write separation
  • Read/write separation: BookKeeper’s read/write separation IO model maximizes sequential disk write performance and is relatively friendly to mechanical hard disks. There is no limit to the number of topics supported by a single Bookie node

Apache Pulsar is designed to address many of the issues we encountered with Kafka, and provides many great features such as multi-tenancy, a consumption model for message queuing and batch streaming, and strong consistency.

In order to further understand Apache Pulsar and measure whether Pulsar can truly meet the needs of our production environment’s large-scale message Pub-Sub, we conducted a series of pressure tests beginning in December 2019. Since we used mechanical hard disks without SSD, we encountered a series of performance problems in the pressure test process. We are very thankful for the help of StreamNative students, and thanks to Si Jie, Zhai Jia and Peng Hui for their patient guidance and discussion. Through a series of performance tuning, Pulsar throughput and stability have been continuously improved.

After three to four months of pressure testing and tuning, we launched the Pulsar cluster into production in April 2020. We gradually replaced the Kafka cluster in production by mixing bookie and broker on the same node. Up to now, the Pulsar cluster size in the production environment is a dozen units, processing tens of billions of messages per day, and is gradually expanding and migrating Kafka traffic to the Pulsar cluster.

Problems encountered with manometry/Pulsar

When using Pulsar, you may encounter the following problems:

  1. Pulsar Broker node load is unbalanced.
  2. The Pulsar broker side has a low Cache hit ratio, resulting in a large number of read requests entering bookie and poor read performance.
  3. Broker memory overflow frequently occurs during pressure testing (OOM).
  4. Bookie process hangs due to direct Memory OOM.
  5. The Bookie node is unbalanced and jitter frequently.
  6. When the Journal disk is HDD, although fsync is turned off, bookie Add Entry 99th latency is still high and write performance is poor.
  7. When there are a large number of read requests in the bookie, write backpressure occurs and add entry latency increases.
  8. The Pulsar client often displays a Lookup Timeout Exception.
  9. High read/write latency of ZooKeeper causes instability of the Pulsar cluster.
  10. Consumption of Pulsar Topics using the Reader API(eg. pulsar Flink Connector) was slow (pulsar before 2.5.2).

Problems 4, 5, 6, and 7 are particularly serious when Journal/Ledger disks are mechanical hard disks (HDDS). If the Journal/Ledger disk read/write speed were fast enough, messages would not accumulate in Direct memory and there would not be a series of OOM’s.

In our message queue production system, a large amount of data (TB ~ PB level) needs to be stored, and both Journal disks and Ledger disks are SSD disks, which requires a high cost. Is it possible to optimize some parameters/policies on Pulsar/BookKeeper? What about enabling HDDS to perform better?

During pressure testing and using Pulsar, we encountered a number of performance issues at the Pulsar Broker level and at the BookKeeper level. BIGO uses Pulsar to tune the performance of Pulsar Broker and Bookkeeper. BIGO uses Pulsar to tune the performance of Pulsar Broker and Bookkeeper. BIGO uses Pulsar to tune the performance of Pulsar Broker and Bookkeeper in both SSD and HDD scenarios. Can get better performance.

Due to space reasons, this performance tuning series is divided into two parts. The first part focuses on the performance tuning of Pulsar Broker and the second part focuses on the performance tuning of BookKeeper in conjunction with Pulsar.

The rest of this article focuses on the performance-related aspects of Pulsar/BookKeeper and suggests some performance tuning solutions that have been running well and paying off in BIGO production systems.

Environment deployment and monitoring

Environment deployment and monitoring

Because BookKeeper and Pulsar brokers rely heavily on ZooKeeper, low Read/Write latency is required to ensure Pulsar stability. In addition, BookKeeper is an IO intensive task, so Journal/Ledger is kept on a separate disk to avoid interference between IO. The summary is as follows:

  • The Bookie Journal/Ledger directory is on a separate disk
  • If the Journal/Ledger directory uses HDDS, do not store the ZooKeeper dataDir/dataLogDir on the same disk as the Journal/Ledger directory

Both BookKeeper and Pulsar Broker rely on Direct Memory, and BookKeeper also relies on PageCache for data read and write acceleration, so a proper memory allocation strategy is also critical. Sijie of Pulsar community recommended the following memory allocation strategy:

  • OS: 1 ~ 2 GB
  • JVM: 1/2
    • heap: 1/3
    • direct memory: 2/3
  • PageCache: 1/2

Assuming that the machine physical memory is 128GB and bookie and broker are intermixed, the memory allocation is as follows:

  • OS: 2GB
  • Broker: 31GB
    • heap: 10GB
    • direct memory: 21GB
  • Bookie: 32GB
    • heap: 10GB
    • direct memory: 22GB
  • PageCache: 63GB

Monitor: performance tuning, monitoring first

In order to intuitively identify system performance bottlenecks, we need to build a complete monitoring system for Pulsar/BookKeeper to ensure that indicators are reported to each link. When anomalies occur (including but not limited to performance problems), performance bottlenecks can be quickly located through relevant monitoring indicators and corresponding solutions can be developed.

Both Pulsar and BookKeeper provide Prometheus interfaces, and related statistics can be obtained through Http and connected to Prometheus/Grafana directly. Interested students can directly follow the Pulsar Manager’s instructions to install: github.com/streamnativ…

The indicators that need to be focused on are as follows:

  1. Pulsar Broker

    • jvm heap/gc
    • bytes in per broker
    • message in per broker
    • loadbalance
    • Broker Cache hit ratio
    • bookie client quarantine ratio
    • bookie client request queue
  2. BookKeeper

    • bookie request queue size
    • bookie request queue wait time
    • add entry 99th latency
    • read entry 99th latency
    • journal create log latency
    • ledger write cache flush latency
    • entry read throttle
  3. ZooKeeper

    • local/global ZooKeeper read/write request latency

There are some metrics that are not provided with the Grafana template in repO above, so you can add PromQL to configure them yourself.

Pulsar Broker side performance tuning

The performance tuning of Pulsar broker can be divided into the following aspects:

  1. Load balancing
    • Load balancing between brokers
    • Load balancing between Bookie nodes
  2. Current limiting
    • Flow control is required for the Broker to receive messages to prevent the Broker direct memory OOM due to burst flood traffic.
    • The Broker needs to perform flow control when sending messages to consumers/readers to prevent frequent GC by sending too many messages at once.
  3. Improve Cache hit ratio
  4. Ensure low latency for reading and writing ZooKeeper
  5. Disable Auto Bundle split to ensure system stability

Load balancing

Load balancing between brokers

Load balancing improves the utilization of Broker nodes, improves the hit ratio of Broker Cache, and reduces the probability of Broker OOM. This part is about Pulsar bundle Rebalance.

The structure of Namespace Bundle is as follows. Each Namespace (Namespace) consists of a certain number of bundles. All topics under the Namespace are mapped to the unique Bundle through hash. The bundle is then loaded/unloaded onto the serving broker using load/unload.

If there are no or fewer bundles on one broker than on other brokers, traffic on that broker will be lower than on other brokers.

The existing/default bundle rebalance policy (OverloadShedder) is Check whether the maximum CPU, Memory, Direct Memory, BindWith In, and BindWith Out usage of all brokers In the cluster exceeds the threshold every minute (85% by default). If the threshold is exceeded, a certain number of heavily trafficked bundles are unloaded from the broker. The leader then decides to reload the unloaded bundles to the broker with the lowest load.

The problem with this strategy is:

  1. The default threshold is difficult to achieve and can easily result in most of the traffic in a cluster being concentrated on a few brokers.
  2. The threshold adjustment standard is difficult to determine and is greatly affected by other factors, especially when other services are deployed on this node.
  3. After the broker is restarted, no traffic is balanced to the broker for a long time because no other Broker node has reached the Bundle unload threshold.

To this end, we developed a mean-value based load balancing strategy that supports CPU, Memory, Direct Memory, BindWith In, and BindWith Out weight configurations, as described In PR-6772.

This policy is supported in Pulsar 2.6.0 and is disabled by default. You can enable it by modifying the following parameters in broker.conf:

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
Copy the code

We can accurately control the weight of different acquisition indexes by following parameters:

# The broker resource usage threshold. # When the broker resource usage is greater than the pulsar cluster average resource usage, # the threshold shredder will be triggered to offload bundles from the broker. # It only takes effect in ThresholdSheddler strategy. loadBalancerBrokerThresholdShedderPercentage=10 # When calculating new resource usage, the history usage accounts for. # It only takes effect in ThresholdSheddler strategy. LoadBalancerHistoryResourcePercentage = 0.9 # The BandWithIn usage weight when calculating The new resource usage. # It only Takes effect in ThresholdShedder strategy. LoadBalancerBandwithInResourceWeight = 1.0 # The BandWithOut usage weight The when calculating new resource usage. # It only takes effect in ThresholdShedder strategy. LoadBalancerBandwithOutResourceWeight = 1.0 # The CPU usage weight when calculating The new resource usage. # It only takes Effect in ThresholdShedder strategy. LoadBalancerCPUResourceWeight = 1.0 # The heap memory usage weight when calculating # new resource usage. It only takes effect in ThresholdShedder strategy. LoadBalancerMemoryResourceWeight = 1.0 # The direct memory usage weight when calculating new resource usage. # It only takes effect in ThresholdShedder strategy. LoadBalancerDirectMemoryResourceWeight = 1.0 # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. # It only takes effect in ThresholdShedder strategy. loadBalancerBundleUnloadMinThroughputThreshold=10Copy the code

Balance the load between bookie nodes

The Bookie node load monitor is shown in the following figure, and we can see:

  1. The load is not uniform between the Bookie nodes, and the difference between the highest and lowest traffic nodes may be several hundred MB/s
  2. Under high load conditions, the load on some nodes may rise and fall periodically, with a period of 30 minutes

The impact of these issues is that the Bookie load is unbalanced, resulting in low utilization of the BookKeeper cluster and a tendency to jitter.

The reason for this problem is that the Bookie client’s circuit breaker policy for Bookie write requests is too granular.

Let’s review the Pulsar broker’s strategy for writing bookie:

When the broker receives a message from producer, it first stores the message in the broker’s direct memory. Then calls the bookie client according to the configuration (EnsembleSize WriteQuorum, AckQuorum) strategy will be a message sent to the bookies in pipeline way.

The Bookie client collects statistics on the write failure rate (including write timeout) of each Bookie every minute. By default, when the failure rate exceeds 5 / min, the bookie will be locked in a dark room for 30 minutes to avoid continuous writing to the offending bookie, thereby ensuring the success of message writes.

The problem with this circuit breaker strategy is: When a bookie has a high load (traffic), all messages written to the bookie may slow down at the same time, and all Bookie clients may receive write exceptions at the same time, such as write timeout. All bookie clients will lock the bookie in the dark room for 30 minutes at the same time and then add it to the writable list at the same time after 30 minutes. This causes the bookie’s load to rise and fall periodically.

To solve this problem, we introduced a probability-based quarantine mechanism. When a Bookie client fails to write a message, instead of putting the bookie in a dark room, quarantine is determined based on probability.

This quarantine policy prevents all Bookie clients from locking the same bookie into a dark room at the same time, preventing the traffic jitter. For PR, see: BookKeeper PR-2327. Since the code is not merged and published to the main version of Bookie, you need to compile the code yourself: github.com/apache/book…

In BIGO practice tests, this feature reduced the standard deviation of incoming traffic between Bookie nodes from 75 MB/s to 40 MB/s.

Current limiting

>>Broker Direct Memory OOM

In a production environment, in a high-throughput scenario, we often encounter broker direct memory OOM, causing the broker process to hang. The reason may be that the underlying bookie writes are slow, causing a large amount of data to accumulate in Broker Direct memory. The following figure shows how messages sent by Producer are processed within the broker:

In a production environment, we cannot guarantee that the underlying bookie will always have very low write latency, so we need to limit the flow at the Broker layer. Penghui of Pulsar community has developed the flow limiting function, and the flow limiting logic is shown in the figure below:

Published in Pulsar 2.5.1, see PR-6178.

Consumer consumes a lot of memory

When the producer sends messages in batch mode, the consumer tends to consume too much memory, leading to frequent GC. Monitoring shows that the topic load spikes when the Consumer starts, and then gradually returns to normal levels.

The reason for this problem needs to be seen in combination with the consumption pattern of the consumer end.

When a consumer invokes the Receive interface to consume a message, it requests a message directly from the local receiverQueue. If there are still messages available from the receiverQueue, it returns the message directly to the consumer. When availablePermit < receiverQueueSize/2, the Pulsar client sends the availablePermit to the broker. Tell the broker how many messages to push. If there are no messages available from the receiverQueue, it waits and fails to return until the receiverQueue receives the message pushed by the broker and wakes up the consumer.

After the Broker receives an availablePermit, it reads a Max (availablePermit, batchSize) entry from the Broker Cache/bookie and sends it to the consumer. The processing logic is shown in the figure below:

The question here is: When producer sends batch mode, an entry contains multiple messages, but the broker processes availablePermit requests as one entry. As a result, the broker sends a large number of messages to the consumer at once, far exceeding the availiablePermit (availiablePermit vs. availiablePermit * batchSize) capacity. Causes memory consumption to spike, causes frequent GC, and reduces consumption performance.

To solve the problem of memory inflation on the consumer side, we count the number of messages per average entry on the broker side (avgMessageSizePerEntry). When the availablePermit requested by the consumer is received, it is converted to the size of the entry to be sent, and the corresponding number of entries are pulled from the Broker Cache/bookie and sent to the consumer. The processing logic is shown in the figure below:

This feature was released in Pulsar 2.6.0 and is disabled by default. You can enable it by using the following switch:

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=true
Copy the code

Improve Cache hit ratio

Pulsar has multi-tier Cache to improve message read performance, including:

  • Broker Cache
  • Bookie write Cache(Memtable)
  • Bookie read Cache
  • OS PageCache

This chapter mainly introduces the operation mechanism of the broker Cache and the tuning scheme. The bookie side of the Cache tuning is described in the next chapter.

When the broker receives a message sent by producer to a topic, it determines whether the topic has an Active Cursor. If so, it writes the received message to the topic’s Cache. Otherwise, no data is written to the Cache. The processing process is shown in the figure below:

To determine whether there is an Active Cursor, the following two conditions must be met:

  1. A durable cursor
  2. Cursor lag in managedLedgerCursorBackloggedThreshold range

As readers use non-durable Cursor for consumption, messages written by producer do not enter broker Cache, resulting in a large number of requests falling on bookie, resulting in performance loss.

Streamnative/Pulsar-flink-connector uses the Reader API for consumption, so there are also low consumption performance issues.

Our BIGO Message queue team, Zhao Rongsheng, fixed the problem and removed the durable Cursor from the Active cursor judgment conditions. For details, please refer to PR-6769. This feature is published in Pulsar 2.5.2. If you have any performance problems, please upgrade Pulsar to 2.5.2 or above.

In addition, we added Cache hit ratio monitoring for each Subscription in our topic to facilitate the location of consumption performance issues, which will be contributed to the community later.

Tailing Read

How can we increase the Cache hit ratio and reduce the probability of reading data from bookie when it is already in the broker Cache? The idea is to get as much data as possible from the broker Cache. To ensure this, we optimize in two places:

  1. Control is determined as the maximum lag range of Active Cursor, which is 1000 entries by default and controlled by the following parameters:
# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' # and thus should  be set as inactive. managedLedgerCursorBackloggedThreshold=1000Copy the code

The determination of Active Cursor is shown in the figure below.

  1. Eviction strategy is designed to control broker Cache, eviction strategy is designed to be supported by Pulsar only by default. The default Eviction policy is controlled by the following parameters:
# Amount of memory to use for caching data payload in managed ledger. This memory # is allocated from JVM direct memory and it's shared across all the topics # running in the same broker. By default, uses 1/5th of available direct memory managedLedgerCacheSizeMB= # Whether we should make a copy of the entry payloads when inserting in cache managedLedgerCacheCopyEntries=false # Threshold to which bring down the cache level when Eviction is triggered managedLedgerCacheEvictionWatermark = 0.9 # Configure the cache eviction frequency for managed Gotten the cache (evictions/SEC) managedLedgerCacheEvictionFrequency = 100.0 # All entries that have stayed in the cache for more than the configured time, will be evicted managedLedgerCacheEvictionTimeThresholdMillis=1000Copy the code

Catchup Read

In a Catchup Read scenario, broker Cache is likely to be lost and all Read requests will fall on the bookie. Is there any way to improve bookie reading performance?

Largest Broker sends a read request to the bookie batch batch controlled by dispatcherMaxReadBatchSize, default is 100 entry.

# Max number of entries to read from bookkeeper. By default it is 100 entries.
dispatcherMaxReadBatchSize=100
Copy the code

The larger the batchSize read at a time, the more efficient the underlying bookie is at reading from disk and the lower the Read latency for individual entries. However, if it is too large, batch read latency will also increase because the underlying bookie read operation reads one entry at a time and synchronously.

This part of reading tuning is described in Apache Pulsar performance Tuning in BIGO (Part 2).

Ensure low latency for reading and writing ZooKeeper

Since both Pulsar and BookKeeper are heavily dependent on ZooKeeper, an increase in ZooKeeper read/write latency can result in Pulsar service instability. Therefore, it is necessary to ensure low latency of ZooKeeper read and write operations. The suggestions are as follows:

  1. If HDD disks are used, the ZooKeeper dataDir/dataLogDir and other I/O consuming services (such as the Bookie Journal/Ledger directory) cannot be placed on the same disk (except SSDS).
  2. The ZooKeeper dataDir and dataLogDir can be stored on two independent disks (except SSD).
  3. Monitor broker/ Bookie network card utilization to prevent loss of connection to ZooKeeper due to full network cards.

Disable Auto Bundle split to ensure system stability

Pulsar bundle split is a resource-intensive operation that causes all producer/ Consumer /reader connections to be broken and reconnected to the bundle. In general, auto bundle split is triggered because the bundle is under a lot of pressure. It is necessary to split the bundle into two bundles and distribute the traffic to other brokers to reduce the pressure on the bundle. The parameters controlling auto Bundle split are as follows:

# enable/disable namespace bundle auto split loadBalancerAutoBundleSplitEnabled=true # enable/disable automatic unloading of split bundles loadBalancerAutoUnloadSplitBundlesEnabled=true # maximum topics in a bundle, otherwise bundle split will be triggered loadBalancerNamespaceBundleMaxTopics=1000 # maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered loadBalancerNamespaceBundleMaxSessions=1000 # maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered loadBalancerNamespaceBundleMaxMsgRate=30000 # maximum bandwidth (in + out) in a  bundle, otherwise bundle split will be triggered loadBalancerNamespaceBundleMaxBandwidthMbytes=100Copy the code

When an auto bundle split is triggered, the broker is loaded. Shutting down producers/consumers/readers on the bundle slows connections and takes longer to split. Therefore, it is easy to cause the connection failure of the producer/consumer/reader client due to timeout, triggering the automatic reconnection of the client and causing instability of the Pulsar/Pulsar client.

For production environments, our recommendation is to pre-allocate the number of bundles per namespace and turn off auto Bundle split. If a bundle is found to be under too much pressure during the operation, you can manually split the bundle during the low traffic peak period to reduce the impact on the client.

The number of pre-allocated bundles should not be too large. Too many bundles will cause great pressure on ZooKeeper, because each bundle must regularly report its own statistics to ZooKeeper.

conclusion

This paper introduces the optimization scheme of Pulsar in BIGO practice from the perspective of performance tuning, including environmental deployment, traffic balancing, traffic limiting measures, improving Cache hit ratio, and ensuring Pulsar stability. The experience of the BIGO message queue team in Pulsar production landing was also discussed.

This paper mainly solves the problems mentioned in the beginning (1, 2, 5, 7, 8, 9). For issue 3, we propose a mitigation solution, but do not address the root cause of Pulsar Broker OOM. This issue needs to be addressed from the BookKeeper perspective, and the rest of the issues are BookKeeper related.

Because Pulsar uses a tiered storage architecture, the underlying BookKeeper still needs a series of tweaks to match the high throughput and low latency performance of the upper Pulsar. The next part will cover BIGO’s hands-on experience from a BookKeeper performance tuning perspective.

I would like to thank StreamNative students for their careful guidance and selfless help, which enabled Pulsar to take a solid step in BIGO. Apache Pulsar provides high throughput, low latency, high reliability and other features, which greatly improves BIGO message processing capacity, reduces message queue operation and maintenance costs, and saves nearly half of the hardware costs.

At the same time, we are actively involved in the Pulsar community and contribute to the community. We worked on Pulsar Broker load balancing, Broker Cache hit ratio optimization, Broker related monitoring, Bookkeeper read and write performance optimization, Bookkeeper disk IO performance optimization, Pulsar and Flink & Flink SQL A lot of work has been done in combination with other aspects to help the community further optimize and improve Pulsar functions.

About the author

Hang Chen is the leader of BIGO’s big data messaging platform team, responsible for the creation and development of a centralized publish-subscribe messaging platform for large-scale services and applications. He introduced Apache Pulsar to the BIGO messaging platform and connected with upstream and downstream systems such as Flink, ClickHouse, and other real-time recommendation and analytics systems. He is currently focusing on Pulsar performance tuning, new feature development and Pulsar ecosystem integration.