Author’s brief introduction

the

Tencent Cloud middleware expert engineer

Apache Pulsar PMC, author of Apache Pulsar in Depth. Currently, he focuses on middleware and has rich experience in message queuing and microservices. Responsible for the design and development of TDMQ, currently committed to building stable, efficient and scalable basic components and services.

Introduction:

We covered the various message validation modes for the Apache Pulsar client in our previous in-depth Analysis of Apache Pulsar: Client Message Validation. In this article, we will introduce the management of message validation on the Broker side.

The client uses message acknowledgement to inform the Broker that certain messages have been consumed and should not be pushed again. The Broker side uses a cursor to store the current subscribed consumption location information, including all metadata in the consumption location, to avoid the problem of consumers having to start from scratch after the Broker restarts. Subscriptions in Pulsar are divided into persistent and non-persistent subscriptions. The differences between them are that the cursors of persistent subscriptions are persistent and the metadata is kept in ZooKeeper, while non-persistent cursors are kept only in the memory of the Broker.

An introduction to cursors

Each subscription in Pulsar contains a cursor, and if multiple consumers have the same subscription name (consumer group), they share a cursor. The cursor sharing depends on the consumer’s consumption pattern. In Exclusive or FailOver mode, only one consumer will use the cursor at a time. For Shared or Key_Shared subscriptions, multiple consumers will use the cursor at the same time.

Every time the consumer ack a message, the position of the cursor in the cursor may change. Why is it possible? This relates to the way acknowledge is described in the Client section: Acknowledge for single messages, acknowledge for individual messages in batch messages, and aggregated Message acknowledgement. NegativeAcknowledge does not address cursor changes and is therefore outside the scope of the discussion.

Let’s first look at the confirmation of a single message. If it is an exclusive consumption, the cursor position will move one Entry back for each confirmation of a message, as shown in the figure below:

To accumulate message confirmation, only one message needs to be confirmed, and the cursor can move multiple entries backward. For example, if consumer-1 accumulates entry-4, all entries starting from 0 will be confirmed, as shown in the following figure:

For shared consumption, because multiple consumers consume the message at the same time, there may be a void in the confirmation of the message, as shown in the following figure:

This also explains why the MarkeDeletePosition pointer may change. We can see from shared consumption that message confirmation may be empty, and the MarkeDeletePosition pointer will move only when all entries in the current face are consumed and confirmed. The MarkeDeletePosition pointer will not move backwards if there is a void. So what’s the relationship between this MarkeDeletePosition pointer and the cursor? A cursor is an object that contains multiple attributes, of which the MarkeDeletePosition pointer is just one. As mentioned above, there is another specialized way to store Ack voids in cursors. If we do not store the void separately, then after the Broker restarts, the consumer can only start consuming from the MarkDeletePosition, and there will be a double consumption problem. If this is an example, entry-4 will be consumed again after the Broker restarts. Of course, the void information is stored separately in Pulsar.

Then, let’s take a look at what metadata is recorded in the cursor. Here are just a few key attributes:

The property name describe
bookkeeper A reference to the Bookkeeper Client, which is used to open the Ledger. For example, to read historical data, you can open the closed Ledger. The current Ledger is full, open a new Ledger
markDeletePosition Mark the deletable position where all entries prior to this position have been confirmed, so messages prior to this position are deletable
persistentMarkDeletePosition MarkDeletePosition is persisted asynchronously. This property records the markDeletePosition that is currently persisted. When markDeletePosition is not available, this position prevails. This position is initialized at cursor recovery and then updated as persistence succeeds
readPosition Subscribe to the current read location. Even if there are multiple consumers, the read location is strictly ordered, but the messages are distributed among different consumers. The read position is initialized when the cursor recovers and updated when consumed
lastMarkDeleteEntry The last Entry marked as deleted is the Entry to which markDeletePosition points
cursorLedger Cursor only stores index information in Zookeeper. Ack data is large and stored in Bookkeeper. This property holds a reference to the corresponding Ledger
individualDeletedMessages Empty information used to hold the Ack
batchDeletedIndexes This command is used to save Ack information about a single message in batch messages

If you see cursorLedger, the data is saved to Bookkeeper. What is the use of ZooKeeper’s Cursor information when it is stored in Bookkeeper? We can assume that the cursor information stored in ZooKeeper is only an index, which contains the following attributes:

  • The current cursorLedger name and ID used to open the Ledger in Bookkeeper
  • LastMarkDeleteEntry, the last Entry marked for deletion, contains LedgerId and EntryId
  • The last active timestamp of the cursor

There are several times when a cursor can be saved to ZooKeeper:

  • When a cursor is closed;
  • When Ledger switching occurs, cursorLedger changes.
  • When persistent empty data to Bookkeeper fails and attempts to persist empty data to ZooKeeper.

We can regard the cursor information in ZooKeeper as a Check Point. When restoring data, metadata will be recovered from ZooKeeper first to obtain Bookkeeper Ledger information. Ledger then restores the latest lastMarkDeleteEntry location and void information.

Since the cursor does not write data to ZooKeeper in real time, how do you ensure that the consumption location is not lost?

A Ledger in Bookkeeper can write many entries, so Bookkeeper takes care of the high frequency saving operations, and ZooKeeper only stores low frequency index updates.

Management of message void

In a cursor object, using a individualDeletedMessages container to store all the empty information. Thanks to the rich wheel ecology in Java, the Guava Range library is used directly in the Broker to implement empty storage. For example, suppose that in ledger-1 we have the following void:

Then we store the void information as follows, that is, the range of successive ACKS will be represented by the interval:

[(1:-1, 1:2], (1:3, 1:6]]

The advantage of using an interval is that a small number of intervals can represent the empty case of the entire Ledger, rather than requiring every Entry to be recorded. When a Range has been consumed and confirmed, two ranges will merge into one, which is automatically supported by the Guava Range. If an interval is formed from the current MarkDeletePosition pointer to a subsequent Entry, the MarkDeletePosition pointer can be moved backwards.

Once these message voids are recorded, how can they be used to avoid repeated consumption of messages? When a Broker reads a message from a Ledger, it enters a cleaning phase, such as filtering out delayed messages. At this stage, the Broker traverses all messages to see if the message exists in the Range. If it exists, it has been confirmed, and the message is filtered out and not pushed to the client. The Guava Range provides the Contains interface to quickly check whether a location falls within the Range. This scenario where Entry needs to be filtered basically only occurs after the Broker has restarted and cursor information has just been restored. When readPosition exceeds this empty position, no duplicate messages are read to be filtered.

Then, let’s take a look at individualDeletedMessages the realization of this container. IndividualDeletedMessages type is LongPairRangeSet, the default implementation is DefaultRangeSet, is a Google Guava based Range packaging implementation class. Another Pulsar own implementation optimization version: ConcurrentOpenLongPairRangeSet. The optimized RangeSet is stored in a slightly different way from the Guava Range. The Guava Range uses ranges to record data. The optimized RangeSet also provides a Range interface externally, but uses bitsets internally to record whether each Entry has been confirmed.

The optimized version of RangeSet is more memory friendly with more holes. We can assume a scenario where 100W messages are pulled, but only 50W messages have been Ack and Ack every other message, resulting in 50W holes. At this point, the Range cannot give full play to the advantages of the Range, and there will be 50W Range objects, as shown in the figure below. The optimized version of RangeSet uses bitsets, with one ack each.

We can have our broker. Conf, through configuration items managedLedgerUnackedRangesOpenCacheSetEnabled version of RangeSet = true to use optimization.

Therefore, in Pulsar, MetaDataStore only stores the index information of the cursor, that is, the Ledger in which the cursor is stored. Real cursor data is written to Bookkeeper for persistence using the cursorLedger described above. The entire cursor object is written to an Entry, and its Protobuf is defined as follows:

message PositionInfo {
    required int64 ledgerId = 1;
    required int64 entryId = 2;
    repeated MessageRange individualDeletedMessages = 3;
    repeated LongProperty properties = 4;
    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
}
Copy the code

BatchDeletedIndexes is a ConcurrentSkipListMap. The batchDeletedIndexes is a Position object containing LedgerId and EntryId. Value is a BitSet that records which messages in the Batch have been acknowledged. The batchDeletedIndexes are stored in the same object (PositionInfo) as the single message void and persisted to Bookkeeper.

If the void data fails to be written to Bookkeeper, Pulsar now tries to save it to ZooKeeper, along with the index information. However, ZooKeeper does not save all data, but only a small part of it, to prevent repeated consumption on the client. We can through the broker. The configuration items in the conf to decide just how much data to the ZooKeeper, most persistent configuration items called: managedLedgerMaxUnackedRangesToPersistInZooKeeper, the default value is 1000.

Optimization of message void management

The empty storage scheme seems to be perfect, but it can be problematic in the case of a large number of unconfirmed messages. First, a large number of subscriptions can cause the number of cursors to explode, causing the Broker to consume too much memory. Second, there are many voids that actually don’t change at all, and now the full amount of voids is saved each time. Finally, although RangeSet uses bitsets in memory, the actual MessageRange data stored in Bookkeeper is a collection of ledgerids and entryids, each of which occupies 16 bytes. When the number of voids is high, the total size can exceed 5MB, and Bookkeeper currently limits the size of a single Entry to 5MB, beyond which voids will fail to persist.

In this case, there has been a special PIP to solve this problem. When THE author writes this article, the PIP code has been submitted and is in the Review stage, so the following content may have a certain gap with the final code.

The new scheme mainly uses LRU+ segmented storage to solve the above problems. Since the amount of empty information data in the cursor may be large, only a small number of hot spots are stored in the memory. LRU algorithm is used to switch between hot and cold data, so as to further compress the memory usage. Segmented storage mainly stores empty information into different entries to avoid exceeding the maximum 5MB limit for each Entry.

If we split the empty information into multiple entries for storage, the first problem we face is indexing. When a single Entry record is used, only the last Entry in the Ledger can be read. However, after multiple entries are split, we do not know how many entries need to be read. Therefore, Marker is introduced into the new scheme, as shown in the figure below:

When all the entries are saved, a Marker is inserted. Marker is a special Entry that records all the current split stored entries. When data is recovered, data is read backwards. The index is read first, and then all entries are read according to the index.

As storage involves multiple entries, atomicity should be guaranteed. As long as the last Entry is not read as Marker, it indicates that the last saving is interrupted before completion, and the reader will continue to read until a complete Marker is found.

The storage of empty information does not need to be full every time. Take Ledger as the unit, record whether the data under each Ledger has been modified. If the empty data has been modified, it will be identified as dirty data. Only the part with dirty data will be saved during storage, and then the index in marker will be modified.

Assuming that the empty information stored in Entry-2 is modified, Entry-2 will be marked as dirty data. Next time, only one Entry-2 and another Marker need to be stored. Copy of all entries in Marker to new Ledger will be triggered only when the whole Ledger is full. As shown below:

ManagedLedger implements an LRU linked list in memory through LinkedHashMap. Threads periodically check whether the memory usage of empty information has reached a threshold. If the threshold is reached, LRU switching is required. The swap in of LRU data is synchronous. When the Contains is added or called, it is found that the Ledger index exists in marker, but there is no corresponding data in memory, then the loading of synchronous data will be triggered. Asynchronous swap out and synchronous swap in are mainly used to keep data in memory for as long as possible to avoid frequent swap in and out.

The end of the

There are so many design details in Pulsar that I can’t cover them all due to space limitations. If you think the above knowledge is too scattered and want to learn Pulsar systematically, you can buy my new book “In-depth Analysis of Apache Pulsar”.