Losing messages means losing data, which is unacceptable, otherwise what is the point of MQ?

So mainstream MQ actually provides a reliability delivery mechanism to ensure that messages can be delivered reliably and not lost even if the network is down.

If you find that messages are still missing, it’s probably a developer problem and MQ is probably not configured correctly. Different MQ implementations work the same way to ensure reliable delivery of messages.

1 Verify the lost message

Large companies typically use a distributed link tracking system, which makes it easy to track every message. If you are a small or medium-sized company, there is also a simple solution verification. Using the orderliness of MQ:

  1. On the Producer side, each sent message is appended with a sequential increasing sequence number
  2. Then check the serial number for continuity on the Consumer side
  • If the Consumer receives a message whose sequence number increases strictly, no message is lost
  • If there is a noncontinuous sequence number, the message is lost

The missing sequence number can also determine which message is missing

Most MQ clients support interceptors, which can be used to inject sequence numbers into messages in the interceptor before the Pro message is sent and to detect sequence number continuity in the interceptor before the Con message is received.

  • benefits

Message validation code does not intrude on business code. It is also convenient to turn off/delete the verification logic after the system stabilizes.

To implement the verification method in distributed system, note the following:

  • Kafka and RocketMQ do not guarantee strict ordering on topics, only ordering messages on partitions, so partitions must be specified when sending messages. Message serial number continuity is verified separately in each partition.

If the system has multiple producers, the sending sequence among multiple producers is not well coordinated. Therefore, each Producer needs to generate the message sequence number separately, and an identifier of the Producer needs to be attached, and the serial number continuity needs to be verified separately on the Con side according to each Pro.

The number of Consumer instances should be one-to-one with the number of partitions so that the message sequence number continuity can be easily verified within Con.

2 Ensure reliable message delivery

Where do you lose messages and how do you avoid it?

  • The phase of a message from production to completion of consumption

2.1 Production Stage

Messages are created at Producer and sent over the network to the Broker.

MQ ensures the reliable delivery of messages through the most common request acknowledgement mechanism: when the message sending method is invoked, the MQ client sends the message to the Broker, which receives it and returns an acknowledgement response to the client indicating receipt. After receiving the response, the client sends a normal message.

As soon as Pro receives the Broker’s acknowledgement, it is guaranteed that messages will not be lost during the production phase.

  • Some MQ will automatically retry after not receiving a send confirmation response for a long time
  • If the retry fails, notify the user with a return value or an exception

When writing the message sending code, handle the return value or catch the exception correctly to ensure that the message will not be lost at this stage.

The sample

Kafka can reliably send messages:

When sending synchronously, you just need to catch exceptions.

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("Message sent successfully.");
} catch (Throwable e) {
    System.out.println("Message sending failed!");
Copy the code

If it is sent asynchronously, check it in the callback method. Many lost messages are sent asynchronously without checking the result in the callback.

producer.send(record, (metadata, exception) -> {
    if(metadata ! =null) {
        System.out.println("Message sent successfully.");
    } else {
        System.out.println("Message sending failed!"); System.out.println(exception); }});Copy the code

2.2 Storage Phase

Messages are stored at the Broker side and, if clustered, are copied to other copies at this stage.

Normally, messages are not lost as long as the Broker is working properly. However, messages can be lost if the Broker is abnormal, such as when a process freezes or the server goes down.

If you have high requirements on message reliability, you can configure Broker parameters to avoid message loss due to downtime.

For single-node brokers, you need to configure the Broker parameters: after receiving a message, write the message to disk and send a confirmation response to Pro. This will not lose messages even if the downtime, and can continue to consume after recovery.

  • In RocketMQ, the default asynchronous flush type is flushDiskType

  • Configure SYNC_FLUSH to synchronize disk flushing.

If the Broker is a multi-node cluster, you need to configure the Broker cluster to send messages to at least two nodes and then send acknowledgement responses to clients. This way, when one Broker goes down, other brokers can replace the downed node without loss of messages.

2.3 Consumption Stage

Con pulls messages from the Broker and sends them to Con over the network.

In this phase, a validation mechanism similar to the production phase is used to ensure reliable delivery. After the client pulls the message from the Broker, it executes the user’s consumption business logic and sends a consumption confirmation response to the Broker after success. If the Broker does not receive a consumption confirmation response, the next time it pulls a message, it will return the same message to ensure that the message is not lost during network transmission or because the client failed to execute the consumption logic.

When writing consumer code, be careful not to send a consumer confirmation immediately after receiving a message, but after all the consumer business logic has been executed. Use Python to consume RabbitMQ messages as an example to implement a reliable consumption code:

def callback(ch, method, properties, body) :
    print("[x] received message %r" % body)
    # Process incoming messages here
    print("[x] Consumption completed")
    Send the consumer confirmation response after completing the consumer business logic
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
Copy the code

In the consume callback method, the correct order is

  1. First, save the message to the database
  2. The consumption confirmation response is then sent

This way, if saving the message to the database fails, the consumption confirmation code is not executed and the message is pulled again until the consumption succeeds.

3 summary

How CAN MQ ensure that a message is reliably delivered and not lost throughout the process of sending and consuming it? This process can be divided into three phases, each of which needs to be properly coded and configured to work with MQ reliability mechanisms to ensure that messages are not lost.

  • In the production phase, message sending errors are caught and messages are resent
  • In the storage phase, you can configure parameters related to flush and replication to write messages to disks of multiple copies to ensure that messages are not lost due to a Broker failure or disk corruption
  • In the consumption phase, the consumption confirmation is issued after all the consumption business logic is processed

After understanding the principles of these phases, if messages are lost again, you can add logs to the code to locate the phase where the fault occurs and further analyze it.

4 interview Scenario Quick questions and quick answers

Two consumers have to pull the message, whether can pull to the same

When consumer A pulls A message, it breaks (or blocks due to code problems) without sending an acknowledgement to the Broker. At this time, the message should still be in the Broker. If consumer B pulls at this time, will it pull the message for consumer A? First, MQ generally has a coordination mechanism that does not allow this to happen. But because of the uncertainty of the network, this is extremely unlikely to happen. In the same consumer group, consumer A pulls the message with index=10, but has not sent confirmation. At this time, the consumption position of this partition is still 10, and consumer B pulls the message. There may be two situations:

  1. Before the timeout, the Broker believes that the partition is still occupied by A and rejects B’s request
  2. After A timeout, the Broker considers that A has timed out and failed to return the purchase, but the current purchase position is still 10. When B pulls the message again, it returns 10

Consumers handle duplicate messages

If a message is incorrectly sent during network transmission, the sender resends the message to ensure that the message is not lost because the sender cannot receive confirmation. However, if the acknowledgement response is lost during network transmission, the message can also be resended. That is, both Broker and Consumer can receive duplicate messages, which should be taken into account when writing Consumer code. How do you handle such repeated messages in the code that consumes them without compromising the correctness of your business logic?

Causes of duplicate messages:

  1. Message sending phase, in which repeated messages are sent
  2. The consuming message phase consumes repeated messages

There’s always something unique in a message. Whether it’s the msgId of MQ itself

Or the business order number or something like that, there can be a consumption table in the DB that has a unique index to this unique thing. Insert every time before processing consumer logic, let DB help us to redo.

Solution: Deduplication on the service end

  • Create a message table, before consumer consumption, get the message to do insert operation, use the message ID as the unique primary key, repeated consumption will cause primary key conflict
  • With REDis, a global ID is assigned to the message. As long as the message has been consumed, the message is written into Redis in the form of K-V (< ID,message>). Before consuming the message, redis checks whether there is a corresponding record according to the key.

When Pro sends a message to the Broker (the Send method), this method does not return until the Broker has received the message and stored it properly. This method should block, meaning that if the Broker configures a synchronous flush, it may increase the call time (only for message-sensitive scenarios).

======================= consumer side support idempotent operation, business is generally difficult. The consumer can add a redundancy mechanism. For example, it caches the SN of the latest N messages successfully consumed. After receiving the message, it checks whether the message has been consumed. Good idea.

= = = = = = = = = = = = = = = = = = = = = = = idempotence is a kind of method, if not idempotence, so need to store the message ID of the consumer in the consumer end, the key of this ID when deposit?

  • If it is before consumption, then consumption failed, the next consumption of the same message, will think that the last time has been successful?
  • If you save after the success of consumption, will consumption be partially successful? Unless transaction ACID properties are met.

If the ID continuity check is used, does a producer correspond to only one consumer?

No, the Producer sends a message with a ProducerId and sends it in a specified partition. When a Consumer consumes a message, the sequence number is checked for continuity according to each Producer.

======================= If an ACK message from Producer is lost due to a network failure, the Producer should resend the message with the same unique identifier as the original message. The Consumer only needs to determine whether there is a message with the same identifier before accepting the message. If so, intercept. You can also do idempotent judgment in the consumer side business logic interface, which can be done without intrusions into the business code.

Very good! However, we need to consider how to implement “the Consumer determines whether there is a message with the same identity before accepting the message” in a distributed environment.

======================= detection of message loss is a test done before going online, but is it possible that there is no message inconsistency offline, but there is a message loss online? – online detection message loss logic will be turned off. will there be other detection mechanisms online? This detection logic can be done online without affecting business.