Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation. Supports multi-tenant, persistent storage, and cross-area data replication across multiple equipment rooms, and provides stream data storage features such as strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

background

This is the second article in the Apache Pulsar series that explains Pulsar’s message retention and expiration policies in detail. This article takes a look at how Pulsar is designed and stored in BookKeeper.

In the community, we often see confusion about Backlog, storage size and retention policies. Some of the most common issues are:

  • I did not set Retention policy. Why can I see from Topics STATS that the storage size is much larger than the backlog size?
  • My MSG backlog size is small, but the storage size is growing.

Message model for Pulsar

First, let’s take a look at Pulsar’s message model

As shown in the figure above, Pulsar provides the most basic pub-Sub processing model.

Producer

First, the Producer end produces messages and appends them to topics in the form of append. The specific Topic to which the messages are distributed varies according to whether MSG keys are set in the messages.

  • When the MSG key is set, messages are hashed based on the key and distributed to different partitions
  • Without the MSG key set, messages are distributed round robin to different partitions

Pulsar is similar to Kafka in its message distribution model.

Consumer

In addition to Consumer, Pulsar abstracts a subscription layer for subscription topics. Pulsar can flexibly support Queue and Streaming by abstraction of subscription layer. Each sub gets a complete copy of all data in the Topic, similar to the Consumer group in Kafka. Depending on the subscription type, one or more consumers can receive messages under each subscription.

Currently, Pulsar supports the following four message subscription models:

  • Exclusive
  • Failover
  • Shared
  • Key_Shared

The storage model

Messages are stored only once in the distributed log for each Partition Topic

This means that when a Producer successfully sends a message to a Topic, the message is stored in the storage tier only once, and no matter how many subscriptions you have to the Topic, you are actually working with the same data. Based on this foundation, we can see Apache Pulsar’s hierarchical abstraction from top to bottom as shown below:

The first layer of abstraction is Topic (Partition), which is used to store messages added by Producer. Topics correspond to ledgers one by one, which are divided into fragments one by one. Smaller ertries are stored in fragments, and entries store either one or batch of messages.

  • Tips: In Pulsar, a batch is processed as a message on the broker side, and the specific logic of the batch parsing is acted on when the consumer side receives the message.
  • Node: In Bookkeeper, the smallest unit of data manipulation is done by granularity of segment.

Why do we need to do hierarchical abstraction?

The most straightforward explanation here is to ensure that the data is scattered and evenly distributed across each BK node. This is one of the benefits of designing a layered sharding architecture.

Ack mechanism

Pulsar supports two Ack mechanisms, single Ack and batch Ack. AckIndividual means that a Consumer can Ack a specific message based on the messageID of the message. Batch Ack (AckCumulative) indicates that multiple messages are Ack at a time.

Subscribe mechanism

To better understand Strorage Size and Backlog, we first need to understand the subscription mechanism in Pulsar, as shown below:

When there is a backlog of messages, you can use the clear-backlog to clear the backlog. Clearing the backlog is a dangerous operation, so you will be prompted to confirm that you want to remove the backlog. Clear-backlog provides the -f(–force) argument to mask this.

The Producer continues to send messages to the Topic as an add-on. The Consumer creates a Subscription to the Topic. When the Subscription succeeds, it initializes a Cursor pointing to the location of the message. The default is Latest.

Cursor is used to store the state information consumed in a subscription

In the figure above, we can see that the Topic below the subscription has successfully received and Ack the M4 message. All message states, including M4, are marked as deletable. In Pulsar, this position is marked with a MarkDeletePosition. All subsequent messages represent messages that have not yet been consumed by this subscription.

As time goes by, assume that in the AckCumulative scenario, the Consumer in the above subscription consumes some more messages, and the current Cursor position moves to the position of M8, which means that all messages before M8 can enter the deletion state.

Suppose that in the AckIndividual scenario, the Consumer in the above subscription only consumes the m7 message and sends the Ack request. The m5 and M6 messages are still not consumed successfully. Therefore, the current messages in the deletion state are the messages before M4 and the message m7. That is, in this scenario, there is an Ack hole in the Topic due to the use of single ACKS.

Cursor = Offset + IndevidualDeletes, Ack triggers Cursor movement but does not delete any messages

Over time, in the case of a single Ack, the hole in the Ack may disappear by itself, as shown below:

We have described the cursor movement in Topic in the case of a single subscription mixed with a batch Ack. Given that multiple subscriptions are currently subscribed to this Topic, each Subscription gets a full Copy of the data in the Topic. A Subscription initializes a new Cursor in the Topic, and the consumption progress between each Cursor is non-overlapping, resulting in the following situation:

In the figure above, there are two subscriptions for this Topic: renewal-1 and renewal-2. The Consumer in Subscription 1 consumes messages before M4, and the Consumer in Subscription 2 consumes messages before M8. The four messages between M4-M8 cannot be deleted because the Subscription has not been completed by the Subscription 1. The messages that are currently in the deletable state are those that are pre-M4, the Subscription that is consuming the least in this Topic. The problem is that if I go down the current Subscription 1, its Cursor position remains unchanged, and the data in this Topic remains undeletable.

In view of the above scenarios, Pulsar introduces the concept of TTL, which allows the user to set the TTL time. If the message reaches the TTL threshold and still does not move, the TTL mechanism will be triggered and the Cursor will automatically move back to the specified position. One thing to note here is that we’ve been emphasizing that TTL moves Cursor positions, so far we haven’t mentioned the concept of message deletion, so don’t confuse the two. All TTL does is move the Cursor, without any logic to delete the message.

Backlog

To better represent the data that is not consumed in a Topic, Pulsar introduces the concept of Backlog to describe this portion of the message. Backlog can be divided into two forms:

  • Topic Backlog: The slowest set of subscriptions
  • Subscription Backlog: A collection of Pointers to unconsumed data for a single Subscription level

Backlog A belongs to Topic Backlog; Backlog A belongs to Subscription-1 Backlog; Backlog B belongs to the Backlog for Subscription-2.

The Backlog changes over time, as shown in the following figure:

One important point to note here is that backlogSize records messages with batch, that is, a batch is treated as a message. Because parsing the entire batch on the broker side will put a certain burden on the broker and waste a lot of CPU resources, the parsing of the specific batch logic is processed on the Consumer side. So the Backlog essentially records the number of entries mentioned above.

In Pulsar, there are two metrics for Backlog, as follows:

  • MsgBacklog: Records the collection of all unack entries
  • BacklogSize: Records the size of all unack messages

The Retention mechanism

In Apache Pulsar, BookKeeper is used as a storage layer to allow users to persist messages. To ensure that messages do not persist indefinitely, Pulsar introduces a Retention mechanism that allows users to configure message persistence policies. By default, the persistence mechanism is turned off, meaning that once a message has been Ack, it enters deletion logic.

When configuring the Retention policy, you can specify the following two parameters:

  • Size: indicates the threshold of the persistence size. 0 indicates that the Retention size policy is not configured, and -1 indicates that the set size is infinite
  • Time: indicates the threshold of the persistence time. 0 indicates that Retention time policy is not configured, and -1 indicates that time is infinite

After the Retention policy is introduced, the overall Topic is represented in the following view, with m0-M5 representing messages that have been confirmed by all subscriptions and have exceeded the Retention policy threshold, i.e., those messages are being prepared for deletion. Note that what I am describing here is [ready to delete] and whether it can be deleted is uncertain.

At the beginning, we abstracted from the topmost Topic step by step to a specific MSG. (Here, for the convenience of description, we ignored the concept of Batch, that is, an MSG is equivalent to an entry.) Now we add all the concepts back in reverse. In BK, the smallest unit allowed for operation is a segment. Therefore, deletion of a message cannot be performed at the specific MSG (Entry) level. Deletion operations must be performed on a segment. As shown below:

Suppose m0-M3 belongs to Segment3; M4-m7 belongs to Segment2; M8-m11 belongs to Segment1. As described in the figure above, all messages from M0-M5 can be deleted, but segment 2 contains M6. M7 does not reach the Retention threshold, so it cannot be deleted yet.

Storage Size

Pulsar introduces storageSize to describe the entire concept in order to more easily express the amount of storage space currently occupied by messages. As shown in the following figure, when the backlog B and storageSize are the same, backlogSize is equivalent to storageSize.

Due to the introduction of single-ACK, Retention policies, and Bookkeeper’s segment-based deletion, it is likely that Storage sizes will be larger than backlog sizes, as shown in the following figure:

conclusion

  1. Messages are stored only once in the distributed log for each Partition Topic
  2. Cursor is used to store the consumption status of a Consumer under a subscription
  3. Cursor is equivalent to offset (kafka) + individualDeletes
  4. Ack updates the Cursor position in the Topic
  5. When a message has been Ack by all subscribers, the message enters the deletable state
  6. All unacknowledged messages are kept in the Subscription backlog
  7. TTL automatically updates Cursor positions by setting a time threshold
  8. Retention policy is used to manipulate messages that have been Ack
  9. Messages are deleted in segments, not entries.

About the author

Ran Xiaolong, R&D Engineer, Tencent Cloud Micro Service Product Center, Apache Pulsar Committer, Apache BookKeeper Contributor

Related to recommend

  • Analysis of Pulsar Namespace Policy
  • Message Lifecycle: What does Message Lifecycle look like in Pulsar
  • Understand how Apache Pulsar works