Introduction:

Apache Pulsar is a multi-tenant, high-performance inter-service message transfer solution that supports multi-tenant, low latency, read/write separation, cross-region replication, rapid expansion, and flexible fault tolerance. Tencent Cloud internal Pulsar working group has done in-depth research on Pulsar and a lot of performance and stability optimization, which has been launched in Tencent’s internal business TDBank. This article is part of the Pulsar Technology series. It introduces Message Deduplication in Pulsar for your reference.

Background on Message Deduplication

In the product design of message-oriented middleware, the delivery design of messages generally refers to the three delivery semantics proposed in Kafka, which are as follows:

At the most, at least once.

At least once

To be exactly (or exactly) once

It is important to note that this is a qualified description of the delivery behavior.

At most once: When the client produces a message, it delivers the message only once. There is no guarantee that the message will be successfully produced.

At least once: When producing a message, the client may deliver it multiple times before receiving a successful response. In this scenario, multiple duplicate messages may exist on the server.

Exact once (or exactly once) : When the client produces a message, the server is guaranteed to save only one copy of the message for this production. In this case, “this production” usually refers to a call to “SendMessage” by the client. In this sense, the server does not typically handle a scenario where production is called multiple times to the same message body, resulting in duplicate messages. Simply put, being “accurate once” does not mean that the message is repeated.

Many systems claim to provide “exact-once” delivery semantics, but a close reading of their claims reveals that some systems’ claims can be somewhat misleading, and we need to consider their guarantee of semantics in the event of production timeouts, partial copy writes succeeding, partial failures, and so on.

At present, most of the messaging middleware products, such as Kafka, RocketMQ, Pulsar, InLong-Tube, RabbitMQ, ActiveMQ, etc., support at-least once (at least once) delivery semantics, that is, production success message, At least one message is guaranteed to be stored on the server and at least one message is guaranteed to be consumed by the consumer. However, there are still few products that support the exactly-once meaning.

Here, we highlight Pulsar’s Message Deduplication (an implementation of exactly-once), which isn’t what you might think.

Message Deduplication for Pulsar

Functional configuration

Message Deduplication in Pulsar is disabled by default. To enable this, you need to modify the configuration of the Broker side and add a little configuration to the client side. (Please refer to pulsar’s website for details)

To enable Message Deduplictiaon, the following configuration needs to be changed on the Broker side:

‘# Enable Message Deduplication

BrokerDeduplicationEnabled# deduplication function, the number of producers is limited

brokerDeduplicationMaxNumberOfProducers

The interval at which the broker side generates deduplication snapshot information

brokerDeduplicationEntriesInterval

How long deduplication information was kept at the broker after the producer was disconnected

brokerDeduplicationProducerInactivityTimeoutMinutes`

Second, the producer client needs to make the following changes:

1. Specify a name for the producer.

2. Set the message production timeout to 0 (default: 30s).

A code example is as follows:

PulsarClient pulsarClient = PulsarClient.builder()

     .serviceUrl("pulsar://localhost:6650")

     .build();
     
Copy the code

Producer producer = pulsarClient.newProducer()

     .producerName("producer-1")  
     .topic("persistent://public/default/topic-1")
     .sendTimeout(0, TimeUnit.SECONDS)
     .create();
Copy the code

Functional principle

For each sent Message request, the client incrementally generates a unique Sequence ID, which is placed in the metadata of the Message and transmitted to the Broker. At the same time, the Producer also maintains a PendingMessages queue. After receiving an Ack message from the Broker, the Producer removes the message with the same Sequence ID from the PendingMessages, and the client considers the message successfully produced.

When Message Deduplication is enabled, the Broker determines whether each received Message request is duplicate.

The logic of judgment is as follows:

1. For each producer, the Broker saves the maximum Sequence ID of the production message in two dimensions, namely, the received one and the completed one, using the producer name as the key.

‘/ Currently unacceptable /

ConcurrentOpenHashMap<String, Long> highestSequencedPushed

/ Currently stored processed /

ConcurrentOpenHashMap<String, Long> highestSequencedPersisted`

If the Sequence ID is greater than the Sequence ID of the same ProducerName in the two dimensions saved by the Broker, the Sequence ID will not be repeated. If less than or equal, the message is repeated. When a message is repeated, the Broker simply returns without continuing with the subsequent storage process.

The configuration and implementation principle of Message Depulication feature of Pulsar are introduced. It can be seen that Message Depulication at the Pulsar Broker is not the de-weighting of the Message body, but the Broker within a certain time range without the client setting the timeout time. A unique row guarantee for messages with the same Sequence ID delivered by clients under the same producer name.

conclusion

Since version 0.11.0.0, Kafka has provided the option to deliver idempotent processing and the processing of transactional messages for the exactly once meaning within and between topics, respectively. (Interested students can participate in kafka source code and official website introduction.) Pulsar’s Message Deduplication feature is similar to the guarantee of Exaxtly -once in Kafka’s single Topic. It can also be regarded as an implementation of exaxtly-once.

It is important to note that exaxtly-once does not equal message deduplication. In real development, it is possible for both the production and consumption parts to produce duplicate messages.

The producer of the message, the storage status of the message on the server side is uncertain until it receives definitive confirmation that the message was successfully produced.

For example, the server may have two or more copies of the message if the producer does not receive a production response for a certain period of time and chooses to resend.

In addition, the consumption part may also get repeated push messages in the following scenarios:

The Broker did not receive an Ack or the consumer did not trigger an Ack when the consumer restarted.

2. The Broker restarts because Ack information of consumers is not stored in real time. After the Broker restarts, a small number of consumed messages may be pushed repeatedly.

3. When a consumption exception occurs, the client uses reconsumerLater or negativeAck to confirm that the Broker will push a message again.

Therefore, you need to pay attention to the related scenarios and limitations when choosing the features of message-oriented middleware. Avoid unnecessary impact on business due to duplicate messages.

one more thing

Tencent Cloud is based on the message middleware developed by Apache Pulsar –TDMQ Pulsar edition, which has excellent cloud native and Serverless features. It is compatible with various components and concepts of Pulsar, and has the underlying advantages of computing and storage separation and flexible expansion and scaling. Currently, TDMQ Pulsar version has started commercialization. Users who are interested in Pulsar can visit the official website (Message Queue Pulsar version _Serverless Cloud Native Message queue – Tencent Cloud (tencent.com) for more details.