In this article, we will learn how Apache Pulsar works. Apache Pulsar has the concept of high-level Topic(Topic) and Subscription(Subscription) at the upper level, and data is stored in binaries that are distributed across multiple topics on multiple servers at the lower level. There’s a lot of detail in it. We can break it down into different layers of abstraction to make it easier to understand Apache Pulsar’s architectural design.

Next, we analyze layer by layer according to the figure below.

Topic, Subscription and Cursors

Messages are stored in topics. Each message has an offset in the log structure. Apache Pulsar uses a cursor to track offsets. The producer sends the message to a specific Topic, and Apache Pulsar guarantees that the message will not be lost once confirmed.

Consumers consume messages in a Topic through subscriptions. The subscription is the logical entity of the cursor (trace offset) and also provides some additional guarantees depending on the subscription type

  • Exclusive (Exclusive)– Only one message can be consumed by a subscription
  • Shared (share)– There can be multiple consumers in a subscription at the same time, and multiple consumers share messages in a Topic
  • Fail - Over (disaster)– A subscription can have only one consumer at a time, and can have multiple backup consumers. If the primary consumer fails, the backup consumer takes over. You don’t have two active consumers at the same time.

Multiple subscriptions can be added to a Topic. A subscription contains no data for a message, only metadata and cursors.

Data Retention Policy

  • Without a data retention policy for a Topic, once all of a Topic’s subscription cursors have been successfully consumed to an offset, the messages in front of that offset are automatically deleted. This means that confirmation of consumption is required on all subscriptions to the Topic.

  • If a Topic has a data retention policy, messages that have been consumed and confirmed will be deleted if they exceed the retention policy threshold (the size of the message storage for a Topic, the duration of the message retention within a Topic).

Consumers can confirm messages individually or cumulatively. Cumulative acknowledgements provide better throughput, but repetitive message processing is introduced after message consumption fails. Note that cumulative consumption does not apply to shared mode subscriptions because cumulative validation is based on offsets. However, batch validation is supported in the client API, which reduces the number of RPC calls to improve throughput for subscription contention consumption in shared mode.

Finally, there are partitions similar to Kafka Topic. The difference is that partitions in Apache Pulsar are also topics. Like Kafka, producers can poll, hash, or explicitly partition to send messages.

Layer 2 (1) – Logical storage model BookKeeper

BookKeeper is a general purpose distributed log storage solution.

First, BookKeeper stores data to nodes in the cluster, each of which is called a Bookie. Second, both Pulsar and BookKeeper use Apache Zookeeper to store metadata and monitor node health.

Ledger & Fragment

A Topic is really a ledgers stream. Ledger is itself a journal. So a series of child logs (Ledgers) form a parent log (Topic).

Ledgers append to a Topic, and items (messages or sets of messages) append to Ledgers. Ledger is immutable once closed. Ledger is the smallest deletion unit, that is, we cannot delete a single entry but delete the whole Ledger.

Ledgers themselves are also decomposed into multiple fragments. Fragment is the smallest distributed unit in a BookKeeper cluster.

Topic is a concept in Pulsar. Ledgers and fragments are concepts in BookKeeper, although Pulsar knows and uses Ledgers and fragments.

Each Ledger(consisting of one or more fragments) can be replicated across multiple BookKeeper nodes (Bookies) to achieve data disaster recovery and improve read performance. Each Fragment is copied in a different set of Bookies (there are enough Bookies).

Each Ledger has three key configurations:

  • Ensemble Size (E)
  • Write Quorum Size (Qw)
  • Ack Quorum Size (Qa)

These configurations can be applied to the Topic level, which pulsar then sets on the BookKeeper Ledgers/Fragments used by the Topic.

Ensemble Size (E) determines the Size of the Bookies pool available for Pulsar to write Ledger to. Each Fragment may have a different list of Bookies. The Broker will select a set of Bookies when creating the Fragment, and the number of E is the same. There must be enough Bookies (> E).

Write Quorum (Qw) is the actual number of Bookies that Pulsar will Write. It could be equal to or less than E. When Qw is less than E, each Bookie provides only a subset of read and write requests. This improves throughput and reduces latency.

Ack Quorum (Qa) is the confirmation of the number of Bookies written, which the Pulsar Broker sends to the client. For consistency, Qa should be :(Qw + 1) / 2 or greater.

In practice:

(Qa = =’ve) or

(Qa == qW-1) –> This avoids slow single node response and improves write latency.

A new Ledger is created when a new Topic is created or a Ledger is rolled. Ledger scrolling occurs and a new Ledger is created in the following cases:

  • Ledger size or time limit reached.
  • Ledger ownership (ownership of the Pulsar Broker) changed (more on that later).

New fragments are created in the following cases:

  • Create a new Ledger.
  • A write error or timeout occurred in the Fragment using Bookies.

When a Bookies cannot serve writes, the Pulsar Broker creates a new Fragment and ensures that the number of Bookies written reaches Qw, retrying until the message is persisted.

Through the previous introduction, we can get the following understanding:

  1. Add E to optimize latency and throughput. Increased Qw sacrifice throughput for redundancy. Adding THE DISASTER recovery of Qa promotion data increases the delay and the delay caused by the slow response of a single node.
  2. E and Qw are not lists of Bookies. They support indicating the size of the pool of Bookies available to service a given Ledger. Pulsar will use E and Qw when creating a new Ledger or Fragment. Each Fragment has a fixed set of Bookies that are immutable.
  3. Adding a new Bookies doesn’t mean you need to manually Rebalance. These new Bookies will automatically become candidates for fragments. After joining the cluster, new Bookies will be written immediately after a new Fragment/Ledger is created. Each Fragment can be stored in a subset of different Bookies! We do not need to explicitly assign Topic or Ledger to the specified Bookies.

Let’s stop and summarize. This is a very different and complex model than Kafka. For Kafka, each Partition copy is stored completely on the Kafka node. Partitions and their copies consist of a series of segments and index files.

The advantage of the Kafka model is its simplicity and speed. All reads and writes are sequential. Unfortunately, a single node must have enough disk space to handle copies, so very large copies may force you to use very large disks. The second disadvantage is that you have to do Rebalance when the cluster is expanding. This process is painful and requires good planning and execution to spread the storage load on nodes without any failures.

Back to the Pulsar + BookKeeper model. The data in Topic is distributed over multiple Bookies. Topics are divided into Ledgers, and Ledgers are divided into Fragments and distributed on the Bookies used by the Fragments. When you need to expand the cluster, just add more Bookies and they’ll start writing data to the Bookies when you create a new Fragment. No longer need kafka Rebalance. However, reading and writing now jump between Bookies. We’ll see how Pulsar is managed soon.

But now each Pulsar Broker needs to keep track of Ledgers and Fragments contained within each Topic. This metadata is stored in Zookeeper and can cause serious problems if it is lost.

In the storage tier, we evenly write a Topic to the BookKeeper cluster. We avoid the pitfalls of writing the entire data for a Topic or replica to a particular node. This avoids painful Rebalance.

Level 2 (2)- Pulsar Broker and Topic ownership

Pulsar Broker is stateless, with no persistent state that cannot be lost. They are separated from the storage tier. Each Topic is owned by a Pulsar Broker that provides all the read and write operations for the Topic.

When the Pulsar Broker receives a write request, it performs a write on the Bookies used by the current Fragment of the current Topic.

In general, the current Ledger will have a Fragment. Once the Broker acknowledges the write (satisfying Qa), the Pulsar Broker sends confirmation to the producer client.

Acknowledgments can only be sent if all previous messages have been confirmed (satisfy Qa). If, for a given message, the Bookie responds incorrectly or not at all, the Broker creates a new Fragment on a new Bookies (without the Bookie in question).

Reading is also done through the Broker that owns this Topic. As a single entry point for a given Topic, the Broker knows which offsets have been safely stored in BookKeeper. It only needs to be read from a Bookie. We’ll see in layer 3 how it uses the cache to provide reads from its in-memory cache without having to send reads to BookKeeper.

Borker fault

The health of the Pulsar Broker is monitored by Zookeeper. When the Broker becomes unavailable (in Zookeeper’s opinion), a change of ownership occurs. The new Broker becomes the owner of the Topic, to which all client connections are redirected. The reading and writing of this Topic will be serviced by the new owner.

BookKeeper has a very important feature called Fencing. Fencing ensures that BookKeeper has only one writer (Pulsar Broker) that can write to ledgers.

The working principle is as follows:

  1. The current owner of Topic X (B1) is unavailable (via Zookeeper).
  2. Other brokers (B2) changed the current Ledger state of Topic X from OPEN to IN_RECOVERY.
  3. B2 sends fence message to Bookies of Ledger’s current Fragment and waits (qW-QA) + 1 Bookies response. Ledger will become Fenced after receiving this number of responses. If the old Broker is still active, it cannot write because Qa confirmation cannot be obtained (abnormal response due to FENCING).
  4. B2 then obtains from the Fragment’s Bookies what their last confirmed entry was. It needs the ID of the latest entry and reads forward from that point. It ensures that all entries from that point (which may not have been previously acknowledged to the Pulsar Broker) are copied to the Qw Bookies. Once B2 is unable to read and copy any entries, Ledger will fully recover.
  5. B2 Changes the Ledger state to CLOSED.
  6. B2 can now create a new Ledger and accept write requests.

The great thing about this architecture is that BookKeeper’s fencing feature handles the split brain problem well by making the Pulsar Broker stateless. No split, no disagreement, no data loss.

Level 2 (3) – Cursor trace

Each Subscription stores a Cursor. Cursor is the current offset in the log. Subscription stores its Cursor into the Ledger of BookKeeper.

Layer 3 (1) – Bookie storage

Ledgers and Fragments are logical structures that are maintained and tracked in Zookeeper. Data is not physically stored in files corresponding to Ledgers and Fragments. The implementation stored in BookKeeper is pluggable, and Pulsar defaults to a storage implementation named DbLedgerStorage.

When data is written to Bookie, the message is first written to a log file, which is a pre-write log (WAL) that helps BookKeeper avoid data loss in the event of a failure. It is the same mechanism as persistence assurance for relational databases.

Write operations are also written to the cache. The written cache is accumulated in memory and periodically sorted and flushed. Sort writes to group entries of the same Ledger together to improve read performance. If entries are written in strict chronological order, you cannot take advantage of the efficient sequential operation of the disk when reading. By aggregating and sorting, we achieve Ledger level chronological order, which is what we care about.

The write cache also writes entries to RocksDB, storing the location index of each entry. It simply maps (LedgerId, EntryId) to (EntryLogId, offset in file).

Since the write cache has the most recent message, it attempts to read the write cache at read time, if there is no hit attempt to read the read cache. If neither hits, the location of the entry is looked up from RocksDB, then the entry is read in the log file and updated to the read cache for subsequent requests to hit the cache. These two levels of caching mean that reading can usually be done in memory.

BookKeeper allows read/write separation of disk IO. Write sequentially Log files can be stored on dedicated disks and can be flushed in batches for maximum throughput. In addition, there are no other synchronous disk IO operations from the perspective of write operations. Data is written to the cache in memory.

The write cache asynchronously writes entries in batches to log files and RocksDB, so one disk is used for synchronous write to log files and the other disk is used for asynchronous write entries and read operations.

On the Read side, Read operations are provided by the Read Cache or Log Entry file and RocksDB.

Also consider that writes occupy the bandwidth of the inlet network and reads occupy the bandwidth of the outlet network, but they do not affect each other.

Elegant implementation of disk and network read and write isolation.

Layer 3 (2) – Pulsar Broker cache

Each Topic has a Broker to which it belongs, so all reading and writing is done through that Broker. This provides many benefits.

First, the Broker can cache the tail of the log in memory, which means it can service operations that read the tail data without the need for BookKeeper. This avoids network overhead as well as possible disk reads on Bookie.

The Broker also knows the ID of the Last Add Confirmed entry. This keeps track of which message was the last to be securely persisted.

When there is no message in the Broker’s cache, data is requested from a Bookie used by the Fragment in which the message resides. This may entail additional network overhead and possible disk read costs.

Next we’ll show you how to ensure that messages are adequately replicated in an Apache Pulsar cluster after a node failure.

Data recovery protocol

When a Bookie fails, all Ledgers with Fragments in that Bookie are now being copied. Recovery is the process of recopying Fragments to ensure that the number of copies maintained by each Ledgers is Qw.

There are two types of recovery methods: manual or automatic. The protocol for both copies is the same, but automatic recovery uses the built-in failed node checking mechanism to perform the duplicate copy task to be performed. Manual processes require manual intervention.

We will focus on automatic recovery mode.

Automatic recovery can be run on a dedicated set of servers or through AutoRecoveryMain on Bookies. One of the automatic recovery procedures was selected as Auditor. Autitor checks for unavailable Bookie and does the following:

  1. Read the complete list of Ledgers on Zookeeper to find the Ledgers hosted on the failed Bookie.
  2. For each Ledger, it will create a replicating task on the/UnderReplicated ZNode in Zookeeper.

If the Auditor node fails, the other node is promoted to Auditor. Auditor is a thread in progress with AutoRecoveryMain.

The AutoRecoveryMain process also has Worker threads that run replication tasks. Each Worker listens on a different ZNode to find new replication tasks.

A task will be locked when it is found. If not, the next task is performed.

If the lock is obtained, then:

  1. Scan Ledgers for fragments that are not part of the current Bookie.
  2. For each matched Fragment, it copies the data from another Bookie into its own Bookie, updates Zookeeper with the new collection and identifies the Fragment as Fully Replicated.

If the Ledgers still have insufficient fragments, release the lock. If all fragments are Fully Replicated, the duplicate replication task is removed from /underreplicated.

If a Fragment does not have an end entry ID, the replication task will wait and check again, and if the Fragment still does not have an end entry ID, it will fence the ledger (BookKeeper Fencing) before recopying the Fragment.

Therefore, a Pulsar cluster using automatic recovery mode can fully replicate the detailed data to ensure that there are enough copies of each Ledger. The system administrator makes sure that the right number of Bookies are counted.

Zookeeper

Both Pulsar and Bookeeper require Zookeeper. If the Pulsar node loses visibility of all Zookeeper nodes, it will stop receiving reads and writes and restart. This is a safeguard against the cluster forming an inconsistent state.

This means that if Zookeeper fails, everything becomes unavailable and the Pulsar node’s cache is cleared. So all read operations are sent to BookKeeper when the service is restored, and there could theoretically be latency spikes.

conclusion

  • Each Topic has a Broker to which it belongs.
  • Each Topic is logically broken down into Ledgers, Fragments and Entries.
  • Fragments are distributed in the Bookie cluster. Topic and Bookie are not coupled.
  • Fragments can be banded across multiple Bookies.
  • When a Pulsar Broker becomes unavailable, ownership of the Topic held by that Broker is transferred to another Broker. Fencing prevents the current Ledger of the same Topic from having two brokers.
  • When a Bookie is unavailable, automatic recovery (if enabled) automatically replicates the data to other Bookies. If disabled, you can start the process manually.
  • The Broker caches tail-message logs to efficiently service tail-read operations.
  • Bookies use Journal to provide persistence assurance. This Log can be used to recover data that has not been written into the Entry Log file during fault recovery.
  • Entries for all topics are stored in the Entry Log file. The lookup index is stored in RocksDB.
  • The Bookies Read logic is as follows: Write Cache -> Read Cache -> Log Entry Files(RocksDB as index)
  • Bookies can separate I/O reads and writes from a separate disk.
  • Zookeeper stores all metadata for Pulsar and BookKeeper. If Zookeeper is unavailable the entire Pulsar will be unavailable.
  • Storage can be expanded separately. If storage is the bottleneck, just add more Bookies and they will take over the load automatically without Rebalance.