Introduction to the translator

Released Wednesday by the original peng-hui li in StreamNative English site: StreamNative. IO/en/blog/rel…

Wang Jialing @China Mobile Cloud Capability Center, Pulsar Mobile Cloud Product manager, Apache Pulsar Contributor, active in Apache Pulsar and other open source projects and communities

The Apache Pulsar community has implemented a landmark feature in the just-released version 2.8.0 of Pulsar: the exact-once semantics. Previously, we could only guarantee Exactly-once semantics on a single Topic by enabling message de-repetition at the Broker side. With the release of Pulsar 2.8.0, the transaction API can be used to ensure atomicity of message production and validation in cross-topic scenarios. Next, I’ll explain what these two approaches mean and how to implement them, and how to use the Pulsar transaction feature to implement the exact-once semantics in real-time data message and flow computing systems.

Before we dive into the Pulsar transaction features, let’s review the concept of message semantics.

What is the Exactly-once semantics?

In a distributed system, any node can be abnormal or even down. In Apache Pulsar, too, while Producer is producing messages, the Broker or Bookie may go down and become unavailable, or the network may suddenly go down. The system can have three message semantics based on how the Producer processes the message when an exception occurs.

At-least-once meaning

The Producer ensures that a message is successfully written to the Pulsar Topic by receiving an ACK notification from the Broker. However, when the Producer receives an ACK notification that timed out, or receives an error message from the Broker, it tries to resend the message. If the Broker crashes just as it has successfully written a message to the Topic, but has not yet sent an ACK to the Producer, the resent message from the Producer will be rewritten to the Topic, causing the message to be redistributed to the Consumer.

At-most-once semantics

If the Producer times out when receiving an ACK or does not resend the message when receiving an error message from the Broker, the message may be lost, not written to the Topic, and not consumed by the Consumer. In some scenarios, we can allow message loss to occur in order to avoid repeated consumption.

The exact-once meaning

The exact-once semantics guarantee that even if the Producer sends the same message to the server multiple times, the server will only record it once. The Exactly-once semantics are the most reliable, but also the most difficult to understand. The exact-once semantics require the collaboration of message queue server, message producer, and message consumer applications. For example, if the consuming application successfully consumes and ACK a message, and then rolls back the consuming point to a previous message ID, all messages from that message ID onwards will be re-consumed by the consuming application.

Difficulties in implementing Exactly-once semantics

There are many challenges in implementing Exactly-once semantics in distributed messaging middleware systems. Here is a simple example.

Suppose a Producer sends a message with the content “Hello StreamNative” to the Topic “Greetings” on Pulsar, and a Consumer receives the message from the Topic and prints it out. Ideally, with no exceptions, the message “Hello StreamNative” is written to the “Greetings” Topic only once, and the Consumer receives the message and processes it. An ACK is then used to inform Pulsar that the message has been processed. Even if the Consumer goes down or restarts, it will not receive this message again.

However, exceptions and errors are often everywhere.

Bookie may go down

Pulsar uses BookKeeper to store messages. BookKeeper is a highly available persistent log storage system. Data written to a Ledger (a fragment of a Topic in a Pulsar) is stored on N Bookie nodes. BookKeeper can tolerate the outage of n-1 Bookie nodes. As long as at least one Bookie node is available, data on this Ledger will not be lost. Based on the Zab protocol and Paxos algorithm, the copy protocol of BookKeeper ensures that once data is successfully written to the Bookie, it is automatically copied to the same group of Bookie nodes for permanent storage.

The Broker may break down or the network between Producer and Producer may break down

The Producer ensures that the message is successfully sent by receiving ACK notifications from the Broker. However, not receiving an ACK notification does not always mean that the message failed to be sent. The Broker may fail to send an ACK to the Producer after successfully writing the message to the Topic, or it may fail before writing the message to the Topic. Since there is no way to know why the Broker is abnormal, Producer defaults to assuming that the message was not successfully sent and resends if the ACK fails to be received. This means that in some cases, Pulsar writes duplicate messages, causing consumers to repeat their purchases.

The Pulsar client may go down

Implementing Exactly-once must take into account the fact that the Pulsar client is not available. It can be difficult to tell exactly whether a client is irretrievably down or temporarily unavailable, but it is important for a Broker to be able to tell. Pulsar Broker needs to mask messages from clients that are in an abnormal state. Once the client is restarted, the client can know the status of the previous failure and process subsequent messages from where appropriate.

The Pulsar community implements the exact-once semantics in two stages. In Pulsar 1.20.0-Incubating, we use idempotent Producer to ensure exact-once semantics on individual topics. In the latest release of Pulsar 2.8.0 we introduced a transaction API to ensure atomicity of messages in cross-topic scenarios.

Idempotent Producer: Realizes the Exactly-once semantics of a single Topic

We’ll start in Pulsar 1.20.0-Incubating to ensure exact-once semantics on individual topics through idempotent Producer.

What is idempotent Producer? Idempotent means that the results of one or more requests for the same operation are consistent, and no different results will be generated for multiple operations. If message deduplication is enabled at the Cluster or Namespace level and idempotent Producer is configured on the message Producer side, duplicate messages will be written to the Broker only once if the Producer resends messages due to an exception.

With this function, no messages are lost or duplicated within a single Topic, and all messages are in order. We can enable this function with the following configuration:

  • Message de-duplication is enabled at the Cluster level (effective for all topics under a Namespace), Namespace level (effective for topics under a Namespace), or Topic level (effective for a single Topic)
  • Set an arbitrary name for Producer and set the message timeout to 0

How is this function implemented? In simple terms, this is very similar to TCP’s message de-duplication mechanism: each message sent to Pulsar carries a unique sequence number that Pulsar brokers use to identify and remove duplicate messages. Unlike TCP, which only ensures message de-repetition in real-time connections, Pulsar saves the serial number in the message body to Topic and records the latest received serial number. So even if a Broker node goes down unexpectedly, another Broker node that takes over the Topic can determine if the message is duplicated. The principle is simple enough that the added performance loss from Producer is negligible compared to non-idempotent Producer.

Later versions of Pulsar 1.20.0-Incubating support this, and you can find an introduction to it here.

However, idempotent Producer can only guarantee the exact-once semantics in certain scenarios, but not in others. For example, when a Producer needs to ensure that a message is sent to multiple topics at the same time, the Broker that handles some of them breaks down. If the Producer does not repost the message, some of the messages in the Topic will be lost. If the Producer resends the message, it will cause messages to be written repeatedly in other topics.

On the Consumer side, ACK requests sent by a Consumer to the Broker are best-effort. That is, ACK requests may be lost and the Consumer has no way of knowing if the Broker received them properly. An ACK request is not resent in the event of a loss. This can also cause the Consumer to receive duplicate messages.

Transaction API: Enables atomic operations for message production and validation across topics

To address these issues, we introduced a transaction API to ensure atomicity of message sending and validation in a cross-topic scenario. With this functionality, Producer can ensure that a message is sent to multiple topics at the same time, and that either the messages are sent successfully and consumed on all topics, or none of the messages can be consumed at all. This feature also allows ACK acknowledations of messages on multiple topics in a single transaction operation, enabling end-to-end exact-once semantics.

The following example code demonstrates how to use the transaction API:

PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .enableTransaction(true)
        .build();
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();
producer.newMessage(txn).value("Hello Pulsar Transaction".getBytes()).send();
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message.getMessageId(), txn);
txn.commit().get();
Copy the code

This code shows how to use the transaction API to implement atomization of message sending and validation, and how to use the transaction API to validate messages within the same transaction operation.

Note that:

  • Some messages in the same Topic may belong to a transaction and some messages may not belong to any transaction.
  • Multiple parallel uncommitted transactions are allowed in the Pulsar client. This is a fundamental difference from other transaction-enabled messaging systems and can greatly improve the processing power of transaction messages.
  • The current Pulsar transaction API only supports the READ_COMMITTED isolation level. A Consumer will consume only messages that are not part of any transaction and messages in committed transactions, not messages in uncommitted and rolled back transactions.

Using the transaction API on the Pulsar client requires no additional configuration or dependencies.

End-to-end Exactly-once flow calculation made easier: an example of Pulsar+Flink

Using the Pulsar transaction API, we can already implement the exact-once semantics in flow computing scenarios.

In flow computing systems, a key question is often asked: “If some intermediate nodes fail during flow computing, how do you ensure that the final calculation results do not fail?” The key to solving this problem is how to restart the flow data from the state before the exception occurred after the node with the exception is recovered.

Flow calculations on Apache Pulsar are essentially read-process-write operations on messages on multiple topics. The Source node consumes the message from one or more input topics, performs a series of calculations and state processing on the message through the Process node, and finally sends the processing results to the Topic recording the results through the Sink node. In the case of flow computing, Exactly once refers to the execution of a whole set of read-process-write operations in Exactly once semantics, i.e. no messages are lost on the input Topic, and no messages are repeatedly written to the recorded Topic. This is the Exactly-once effect that users expect on a streaming computing system.

Let’s look at an example of Pulsar combined with Flink for flow calculation.

Before Pulsar 2.8.0, Pulsar combined with Flink only supported Exactly-once Source Connector and at-least -once Sink Connector for flow calculation. This means that end-to-end stream computing systems built with Pulsar and Flink can only implement at-least-once semantics At best. This means that the messages sent to the logged result Topic may be duplicated.

With the transaction API introduced in Pulsar 2.8.0, the PulsAR-Fink Sink Connector can support the exact-once semantics with a simple modification. Flink using two-phase Commit protocol (Two – Phase Commit) to ensure that end-to-end Exactly – once semantics, so we can implement TwoPhaseCommitSinkFunction Pulsar and embedded transaction API. When pulsAR-Fink Sink Connector calls beginTransaction, we create a Pulsar transaction and save the transaction ID. This transaction ID is set for all subsequent messages written to the Sink Connector. These messages are written to Pulsar when the Connector calls preCommit. The Pulsar transaction API is called when the Connector calls recoverAndCommit or recoverAndAbort, respectively, to commit or roll back the Pulsar transaction. Adding a Pulsar transaction ID to a Connector using Flink Checkpoints provides a powerful connection that I capture during a Flink transaction commit and rollback.

Based on the idempotent and atomic operations provided by Pulsar transactions, and the global consistency CheckPoint mechanism provided by Apache Flink, Pulsar and Flink can easily be used to build a flow computing system that conforms to end-to-end exact-once semantics.

subsequent

If you want more details on the implementation of Exactly-once, we recommend reading Pulsar community improvement Proposal PIP-31. For more design details, it is also recommended to read the design documentation.

This article focuses on the user perspective of the transaction API, a new feature in Apache Pulsar 2.8.0, and how to use this feature to implement the exact-once semantics. The design and implementation of the transaction API will be covered in more detail in the next article.

Exactly-Once Made Easy: Transactional Messaging in Apache Pulsar Is a talk given at the Pulsar Summit in North America.

Thank you

Thanks to multiple Pulsar Committers and Contributors who developed this milestone feature over the past year: Penghui Li, Ran Gao, Bo Cong, Addison Higham, Jia Zhai, Yong Zhang, Xiaolong Ran ran, Matteo Merli, Sijie Guo.

At the same time, thanks again to translator Wang Jialing @China Mobile Cloud Capability Center for his excellent translation, let us quickly see the Chinese version of this blog post.

reading

  • Technical Exploration: Transactional event flow for Apache Pulsar

Click on thelink, get Apache Pulsar hardcore dry goods information!