Introduction:

Apache Pulsar is a multi-tenant, high-performance inter-service message transfer solution that supports multi-tenant, low latency, read/write separation, cross-region replication, rapid expansion, and flexible fault tolerance. Tencent Cloud MQ Oteam Pulsar working group has done in-depth research on Pulsar and a lot of performance and stability optimization, which has been launched on TDBank and Tencent Cloud TDMQ. This article will briefly introduce some concepts and principles of Pulsar server message validation.

Author’s brief introduction

the

Tencent Cloud middleware expert engineer

Apache Pulsar PMC, author of Apache Pulsar in Depth. Currently, he focuses on middleware and has rich experience in message queuing and microservices. Responsible for the design and development of TDMQ, currently committed to building stable, efficient and scalable basic components and services.

preface

The highest level of messaging guarantee supported in Pulsar, prior to the occurrence of transaction messages, is that the Producer’s messages are saved exactly once on a single partition through the Broker’s message deduplicate mechanism. If Producer fails to send a message, the Broker ensures that the message is persisted only once, even if the Producer tries sending the message again. However, in the Partitioned Topic scenario, Producer cannot guarantee the atomicity of messages for multiple partitions.

When the Broker is down, the Producer may fail to send a message, and if the Producer has not retried or has exhausted the number of retries, the message will not be written to Pulsar. In terms of consumers, the current message confirmation is the best operation, which cannot ensure that the message will be confirmed successfully. If the message confirmation fails, the message will be re-delivered and consumers will receive repeated messages. Pulsar can only guarantee consumers to consume at least once.

Similarly, Pulsar Functions are only guaranteed to process a single message on an idempotent function once, that is, business assurance idempotent is required. It does not guarantee that processing multiple messages or output multiple results will occur only once.

For example, a Function is executed by consuming messages from topic-A1 and topic-A2, and then aggregating the messages in Function (e.g. Time window aggregate calculation), the results are stored in topic-B, and finally the messages in topic-A1 and topic-A2 are confirmed (ACK) respectively. This Function may fail between output to topic-B and confirm message, or even when confirming a single message. This causes all (or some) of the topic-A1, topic-A2 messages to be re-delivered and re-processed, and new results to be generated, which in turn leads to incorrect results for the entire time window.

Therefore, Pulsar requires a transaction mechanism to ensure exact-once semantics, where production and consumption are guaranteed to be exact-once, without duplication, and without loss of data, even in the event of Broker failure or Function processing failure.

Introduction of the transaction

The original intention of Pulsar transaction messages is to ensure the precise one-time semantics of Pulsar Function, which ensures that when Producer sends multiple messages to different partitions, all of them can succeed or all of them can fail simultaneously. It is also possible to ensure that when a Consumer consumes multiple messages, they can all confirm success or fail at the same time. Of course, you can include production and consumption in the same transaction, and either they all succeed or they all fail.

We use the Function scenario at the beginning of this section as an example to illustrate a scenario where production and consumption are in the same transaction:

First, we need to enable transactions in broker.conf.

\transactionCoordinatorEnabled=true

We then create the PulsarClient and transaction object, respectively. This transaction object is required in both the producer and consumer apis to ensure that they are in the same transaction.

// Create client, PulsarClient PulsarClient = pulsarClient.Builder ().serviceurl ("pulsar://localhost:6650").enableTransaction(true)  .build(); TXN = pulsarClient.newTransaction ().withTransactionTimeout(1, timeUnit.minutes).build().get(); String sourceTopic = "public/default/source-topic"; String sinkTopic = "public/default/sink-topic"; Consumer<String> sourceConsumer = pulsarClient.newConsumer(schema.string). Topic (sourceTopic) .subscriptionName("my-sub") .subscribe(); Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING) .topic(sinkTopic) .create(); // Consume a Message from the original Topic and send it to another Topic in the same transaction. Message<String> Message = sourceconsumer.receive (); sinkProducer.newMessage(txn).value("sink data").sendAsync(); sourceConsumer.acknowledgeAsync(message.getMessageId(), txn); // commit the transaction txn.mit ().get();Copy the code

Let’s start with the Function example at the beginning of this section:

When the transaction is not started, if Function writes the result to SinkTopic first but fails to acknowledge the message (Step 4), this will cause the message to be redelivered (Step 1), Function will recalculate a result and send it to SinkTopic. This results in a single piece of data being double-counted and delivered twice.

If the transaction is not enabled, Function will acknowledge the message and then write the data to SinkTopic (Step 4 and Step 3). If writing to SinkTopic fails and the SourceTopic message has been acknowledged, the data will be lost. The final calculation was also inaccurate.

If a transaction is started, all previous steps are rolled back as long as there is no commit at the end, producing and confirming messages are rolled back so that the whole process can start over again without double-counting or data loss. The sequence diagram is as follows:

We just need to follow the above steps and understand what is done in each step to understand how the whole transaction is implemented. In the following sections, we will introduce them step by step.

Transaction process

Before understanding the whole transaction process, we first introduce the components of Pulsar transactions. Common distributed transactions include TC, TM, RM and other components:

  1. TM: Transaction initiator. Defines transaction boundaries, which are responsible for informing TCS of the start, commit, and rollback of distributed transactions. In a Pulsar transaction, each PulsarClient plays this role.

  2. RM: Indicates the resource manager of each node. Manages resources for each branch transaction, and each RM is registered with the TC as a branch transaction. A TopicTransactionBuffer and PendingAckHandle are defined in Pulsar to manage production and consumption resources respectively.

  3. TC: Transaction coordinator. TC a module used to process transaction requests from the Pulsar Client to track its transaction status. Each TC has a unique ID (TCID). TCS maintain their own transaction metadata stores independently. TCID is used to generate transaction ids and broadcast notifications to different nodes to commit and roll back transactions.

Below, we use a Producer to introduce the whole transaction process. The gray part in the figure represents storage, existing memory and Bookkeeper storage:

  1. Select the TC. A Pulsar cluster may have multiple TCS (16 by default). The PulsarClient needs to select which TC to use when creating a transaction, and all subsequent transaction creation, submission, and rollback operations will be sent to this TC. The selection rule is simple, since the Topic of TCS is fixed, first Lookup the Broker where all partitions are located (each partition is a TC), and then poll to select a TC each time the Client creates a new transaction.

  2. Start a transaction. Code by pulsarClient. NewTransaction () to start a transaction, the Client sends a newTxn in toward the TC of the corresponding command, TC generates and returns a new transaction ID of the object, the object in the preserved the TC ID (used for subsequent requests to find nodes) and transaction ID, The transaction ID is incremental and the same TC generation ID will not be repeated.

  3. Register partitions. A Topic may be a partitioning Topic, and messages are sent to different Broker nodes. In order for TCS to know which nodes the messages will be sent to (TCS need to notify these nodes when a transaction is committed or rolled back), Producer registers partition information with TCS before sending messages. This way, the subsequent TCS know which RMS of the nodes to notify to commit and roll back transactions.

  4. Send a message. This step is not very different from normal message sending, but the message needs to pass through the RM on each Broker. In Pulsar, the RM is defined as the TopicTransactionBuffer, which records some metadata, and the message is written to the original Topic. At this point, although the message has been written to the original Topic, the consumer is not visible, and the transaction isolation level in Pulsar is Read Commit.

  5. Commit the transaction. After the Producer sends all the messages, the TC submits the transaction. After receiving the submission request, the TC will broadcast a notification to the RM node to submit the transaction and update the corresponding metadata so that the message can be consumed by consumers.

How are messages in SETP-4 guaranteed to persist to Topic without being visible?

Each Topic stores a maxReadPosition property that identifies the maximum position that the consumer can read. The maxReadPosition does not change even though the data is persisted to the Topic before the transaction is committed. Therefore, consumers cannot consume uncommitted data.

The message has been persisted, and finally the transaction will be rolled back, how to handle this part of the data?

If the transaction is to be rolled back, RM records that the transaction is Aborted. The metadata of each message will store the transaction ID and other information, and Dispatcher will determine whether this message needs to be delivered to Consumer according to the transaction ID. If it is found that the transaction has ended, it is filtered directly (internal confirmation drop message).

How to deal with partial success and partial failure when the transaction is finally committed?

The TC has a timing object named TransactionOpRetryTimer. All transactions that are not successfully broadcast are handed to it to retry until all nodes succeed or the number of retries exceeds. Isn’t there a consistency problem with this process? So let’s first think about what the scenario would be. Broker nodes are usually unavailable due to downtime, or network jitter is temporarily unavailable. If a Broker goes down in Pulsar, Topic ownership is shifted, and unless the whole cluster becomes unavailable, a new Broker can always be found and resolved by retry. During the Topic attribution transition, the maxReadPosition does not change and the consumer does not consume the message. Even if the entire cluster is unavailable, the Timer will retry the transaction after the cluster is recovered.

If the transaction is incomplete, does it block the consumption of ordinary messages?

Will be. Suppose we start a transaction, send several transaction messages, but do not commit or roll back the transaction. At this point, normal messages continue to be sent to the Topic. Since the transaction message has not been committed, the maxReadPosition will not change and the consumer will not be able to consume new messages, blocking the consumption of normal messages. This is expected behavior in order to ensure the order of messages. Topics do not affect each other because each Topic has its own maxReadPosition.

Transaction implementation

We can divide the implementation of transactions into five parts: environment, TC, producer RM, consumer RM, and client. Since the management of production and consumption resources is separate, we will cover them separately.

Set the environment

The setup of the transaction coordinator starts with the initialization of the Pulsar cluster. In chapter 1, we introduced how to set up the cluster. For the first time, we need to execute a command to initialize the cluster metadata in ZooKeeper. At this point, Pulsar automatically creates a SystemNamespace and creates a Topic in it. The complete Topic looks like this:

persistent://pulsar/system/transaction_coordinator_assign

This is a PartitionedTopic with 16 partitions by default, and each partition is an independent TC. We can set the number of TCS using the — initial-Num-Transaction-Coordinators parameter.

TC and RM

Next, let’s look at the transaction component on the server side, as shown below:

  • TransactionMetadataStoreService overall coordinator is the Broker transactions, we can believe that it is the TC.

  • TransactionMetadataStore is used by TCS to store transaction metadata, such as newly created transactions and partitions registered by Producer. This interface has two implementation classes, one that saves the data to Bookkeeper and one that stores the data directly in memory.

  • TransactionTimeoutTracker server is used to track the transaction of the timeout.

  • Various providers, which belong to the factory class, don’t need special attention.

  • TopicTransactionBuffer The producer’s RM. When a transaction message is sent to the Broker, the RM acts as the Broker to record metadata and store the message to the original Topic. Internal contains TopicTransactionBufferRecover and TransactionBufferSnapshotService, RM metadata will be brush plate structure into a snapshot and timing, the recovery of the two objects are responsible for a snapshot and save the snapshot. Since production messages are in Topic units, there is one for each Topic/Partition.

  • RM of PendingAckHandle consumer. Since consumption is in subscription units, there is one for each subscription.

Since persistent transactions are typically used in online environments, the following principles are based on persistent implementations.

All transaction-related services are initialized when BrokerService is started. TC theme, each Partition is a Topic, TransactionMetadataStoreService during initialization, according to the current Broker nanotubes TC Partition, before recovering from Bookkeeper persistence metadata. Each TC stores the following metadata:

  • NewTransaction. Create a new transaction that returns a unique transaction ID object.

  • AddProducedPartitionToTxn. Registers the Partition information of the producer to send messages, which is used by the TC to notify the RM of the corresponding node to commit/roll back the transaction.

  • AddAckedPartitionToTxn. The Partition information of the message to be consumed by the registered consumer is used by the TC to notify the RM of the corresponding node to commit/roll back the transaction.

  • EndTransaction. End a transaction, which can be committed, rolled back, timed out, etc.

When we initialize PulsarClient, if set enableTransaction = true, then the Client initialization time, will be extra initialize a TransactionCoordinatorClient. The Tenant, Namespace, and Topic name of a TC are fixed. Therefore, the TC Client can Lookup all Partition information and cache it locally. When creating a transaction, the Client polls the TC to be used for the next transaction from the cache list.

Producer Transaction Management

Next we will start a transaction:

TXN = pulsarClient.newTransaction ().withTransactionTimeout(1, timeUnit.minutes).build().get();Copy the code

In this code, a newTxn is sent to a TC and a Transaction object is obtained.

When you start the transaction, TransactionCoordinatorClient will pick up a TC from the cache, and then to the selected TC’s Broker sends a newTxn command, the command of the structure are defined as shown below:

message CommandNewTxn {
    required uint64 request_id = 1;
    optional uint64 txn_ttl_seconds = 2 [default = 0];
    optional uint64 tc_id = 3 [default = 0];
}
Copy the code

Because TCID is included in the command, there is no problem even if multiple TCS are managed by the same Broker. The Broker finds the corresponding TC based on TCID and processes the request.

Before sending a message, the Producer sends an AddPartitionToTxn command to the Broker. Only after the Producer succeeds will the actual message continue to be sent. Once the transaction message arrives at the Broker, it is passed to TransactionBuffer for processing. Once the message is validated, the data is saved to the TransactionBuffer, which is just a proxy (which stores metadata) that eventually calls the original Topic to save the message. When the TransactionBuffer is initialized, the constructor needs to pass in the original Topic object. We can think of the TransactionBuffer as the RM on the Producer side.

TransactionBuffer stores two kinds of information, one is the original message, which is stored directly using Topic. The other is a snapshot that holds the Topic name, maximum readable location information (to prevent consumers from reading uncommitted data), and a list of aborted transactions in that Topic.

The interrupted transaction is broadcast by the TC to other Broker nodes. Upon receiving this information, the TransactionBuffer writes an abortMarker directly to the original Topic, marking that the transaction has been interrupted, and then updates the list in memory. AbortMarker is also a normal message, but the metadata in the message header is different. The data is stored in snapshots for quick recovery after the Broker restarts. If the snapshot data loss, TopicTransactionBufferRecover will read all the data in the Topic from the tail end, each abortMarker interrupt list will be updated in memory. If we have a snapshot, we just need to read from the starting point of the snapshot to recover the data.

Consumer Affairs Management

The consumer needs to include a transaction object with the message acknowledgment, indicating the use of a transaction Ack:

\consumer.acknowledge(message, txn);

The server has a PendingAckHandle object for each subscription to manage transaction Ack information, which we can think of as an RM for managing consumer data. When the Broker detects a message confirmation request with transaction information, it forwards the request to the corresponding PendingAckHandle for processing.

Acknowledgments of all messages that have enabled transactions do not directly modify the MarkDeleted location on the cursor, but are persisted to an additional Ledger and a copy is cached in Broker memory. This Ledger is managed by pendingAckStore. We can think of it as the log of Consumer RM.

When the transaction commits, THE RM invokes the consumer’s Subscription to perform all the message confirmation operations just done. At the same time, a special Marker is written in the Ledger to indicate that the transaction needs to commit. When a transaction is rolled back, an AbortMarker is logged and Message redelivery is triggered.

The log stored in pendingAckStore is the redo log. Upon initialization, the pendingAckStore reads all redo logs from the Ledger to recreate the previous message acknowledgements in memory. Because message validation is idempotent, if the Broker fails, you only need to redo the redo log operation. The redo log in pendingAckStore can also be cleaned when the message in the subscription is actually acknowledged. The cleanup method is simple, just need to move Ledger MarkDelete location in pendingAckStore can be.

Talk about the TC

All transactions are committed or rolled back because the Client informs the TC or because the TC automatically senses the timeout. TC logs contain the partitions to which Producer messages are sent and the partitions to which consumers will Ack messages. The RM is distributed across each Broker and records messages sent and acknowledged throughout the transaction. When the transaction ends, the TC uses TCID as the key, finds all metadata, uses the metadata to determine which RMS on the brokers need to be notified, and finally initiates a broadcast to inform the RMS on the brokers that the transaction needs to be committed/rolled back.

The end of the

There are a lot of design details in Pulsar. Due to the limited space, the author will organize a series of articles for technical sharing. Please stay tuned. If you want to learn about Pulsar in a systematic way, you can purchase the author’s new book Apache Pulsar in Depth.

one more thing

At present, Tencent cloud message queue TDMQ Pulsar version (TDMQ for Pulsar, TDMQ Pulsar version) has begun formal commercialization. Message Queue Pulsar version is a self-developed message middleware based on Apache Pulsar, with excellent cloud native and Serverless features, compatible with various components and concepts of Pulsar, with computing and storage separation, flexible expansion and scaling of the underlying advantages.

What you want to know, please click on ** official website **.