preface

With the development of business, system functions become more and more complex. Gradually we will encounter such problems. For example, the interaction between the system and multiple external systems takes time and energy, the process processing time of a module in the system is too long, and services become unacceptable. The traffic of a module at a certain point in time is too heavy, and the system may break down at any time. To solve these problems, the system may introduce message queues at this time is a good choice. We all know that message queuing has three skills: decoupling, async, and peak clipping. It seems to be a perfect solution to the above problems, but is your system really suitable for message queues?

After the introduction of message queues, the system will bring what bad effects, we will talk about today:

  • The availability of the system decreases and o&M becomes more difficult

    The more external dependencies a system introduces, the worse its stability will be. Once MQ is down, services are affected. In the past, the three systems A, B and C were normal, so there was no problem with the service. Now system A imports message queues, and users B and C consume messages. When the consumption queues break down, services cannot be executed. How can MQ be highly available?

  • System complexity enhancement

    The addition of MQ has greatly increased the complexity of systems where synchronous remote calls between systems are now made asynchronously through MQ. How do I ensure that messages are not re-consumed? How to handle message loss? How to ensure that sequential messages are consumed correctly?

  • Consistency problem

    After system A processes services, system A sends messages to system B and SYSTEM C using MQ. If system B and system C fail to process the messages. How to deal with this situation, how to ensure the consistency of message data processing?

See, introducing a message queue can cause this kind of problem! Therefore, when you use message queues, you must think clearly about whether your system is suitable for you, whether you have the operation and maintenance ability, and whether your business can accept it! This article focuses on how to keep message queues from losing messages, using RocketMQ as an example.

A message goes through three phases from start to finish: production, message queue Broker storage, and consumption. A message can be lost at any one of the three stages, and knowing this, we just need to make sure there are no problems at all three stages, and the message will not be lost. Now let’s talk more about how to ensure that these three stages don’t have problems.

Production phase

The mission of the production phase is to send messages to queues. A Producer sends a message to a message queue through a network request. The message queue receives the message and returns the response to the Producer. RocketMQ has two common ways of sending messages: synchronous and asynchronous.

The synchronous

  DefaultMQProducer producer = new DefaultMQProducer("unique_group_name".true);
        producer.setNamesrvAddr("127.0.0.1:9876; 127.0.0.1:9870");

        SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
        String content = "Test message" + format.format(new Date());
        Message msg = new Message("TopicTest"."TagA", UUID.randomUUID().toString(), content.getBytes(StandardCharsets.UTF_8));

        try {
            producer.start();
            SendResult sendResult = producer.send(msg);
            log.info("MsgId= {}, result = {}", sendResult.getMsgId(), sendResult.getSendStatus());

        } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
            log.error("Error sending message [{}]", msg, e);
        }
Copy the code

As long as the send() method does not throw an exception during synchronous sending, the message is considered successful, that is, the message queue Broker received the message successfully.

If your business is more RT oriented, you can send it asynchronously.

Asynchronous send

I prefer sending messages asynchronously to reduce the RT of messages.

     DefaultMQProducer producer = new DefaultMQProducer("unique_group_name".true);
        producer.setNamesrvAddr("127.0.0.1:9876; 127.0.0.1:9870");

        // A custom unique identifier for the message
        String key = UUID.randomUUID().toString();
        String content = "Message sending Test";
        Message msg = new Message("TopicTest"."TagA", key, content.getBytes(StandardCharsets.UTF_8));
        try {
            producer.start();
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info(Update message [{}] sending status [{}] according to key[{}] of message [{}], msg.getProperty(MessageConst.PROPERTY_KEYS), key, sendResult.getMsgId(), sendResult.getSendStatus());
                }

                @Override
                public void onException(Throwable e) {
                    log.error("Error sending [{}]", msg, e); }}); }catch (RemotingException | InterruptedException e) {
            log.info("Abnormal message sending [{}]", msg, e);
        }
Copy the code

When using asynchronous sending, remember to override the two methods of the SendCallback class. Update the sending status of the message in the onSuccess() method to send successfully, as long as no exception occurs and the onSuccess() method is called back, the message is considered to have been successfully sent to the Broker.

SendStatus problem

When you send a message, you get a SendResult containing SendStatus. Here is a list of instructions for each state:

  • SEND_OK

SEND_OK does not mean it is reliable. To ensure that no messages are lost, SYNC_MASTER or SYNC_FLUSH should also be enabled.

  • FLUSH_DISK_TIMEOUT

If the Broker sets MessageStoreConfig’s FlushDiskType = SYNC_FLUSH (ASYNC_FLUSH by default), And you get this status if the Broker does not finish flushing disks within the syncFlushTimeout of MessageStoreConfig (default: 5 seconds).

  • FLUSH_SLAVE_TIMEOUT

You get this status if the role of the Broker is SYNC_MASTER (ASYNC_MASTER by default) and the slave Broker does not complete synchronization with the master server within the syncFlushTimeout of MessageStoreConfig (5 seconds by default).

  • SLAVE_NOT_AVAILABLE

You get this state if the role of the Broker is SYNC_MASTER (ASYNC_MASTER by default) but the slave Broker is not configured.

There are multiple cases of SendStatus. Therefore, whether sending SendStatus in synchronous or asynchronous mode is used, it is necessary to determine whether SendStatus is SEND_OK. If not, it is necessary to handle SendStatus according to different cases.

  • FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT

These two cases indicate that the message is abnormal. In order not to lose the message, we can wait for a while and resend the message.

  • SLAVE_NOT_AVAILABLE

In this case, the Slave in the cluster is unavailable. Resending data is useless and requires manual intervention.

In fact, if you look at the RocketMQ source code, you can customize the number of retries for different scenarios, whether synchronous or asynchronous, and many methods have internal retry mechanisms.

Warn: this method has internal retry-mechanism, that is, internal implementation will retry {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially delivered to broker(s). It’s up to the application developers to resolve potential duplication issue.

Source code default processing

    /** * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. 

* * This may potentially cause message duplication which is up to application developers to resolve. */
private int retryTimesWhenSendFailed = 2; /** * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.

* * This may potentially cause message duplication which is up to application developers to resolve. */
private int retryTimesWhenSendAsyncFailed = 2; /** * Indicate whether to retry another broker on sending failure internally. */ private boolean retryAnotherBrokerWhenNotStoreOK = false; Copy the code

Custom retry mechanism:

   producer.setRetryTimesWhenSendFailed(5);
   producer.setRetryAnotherBrokerWhenNotStoreOK(true);
   producer.setRetryTimesWhenSendAsyncFailed(5);
Copy the code

Here we will mention message delivery semantics, which simply means delivery guarantee in the process of message delivery. There are three main types:

  • At most once. Messages can be lost or processed, but only once at most.
  • -Leonard: At least once. Messages are not lost, but may be processed multiple times, may be repeated, and will not be lost.
  • Exactly once: Exactly once. The message is processed and only processed once, not lost and not repeated once.

Some exceptions may occur because of occasional fluctuations in the network, but there is a timeout when the ACK is returned to the producer, and the producer tries again, causing the message to be delivered again. After all, a producer cannot guarantee that a message will be delivered successfully to the Broker only once. To prevent repeated consumption of messages, consumers themselves need to ensure idempotent business processing. In addition, a scheduled task is used to compensate for sending a message with SendStatus that is not SEND_OK. It should also be mentioned that retries should be limited, such as the maximum number of retries and the time interval between retries. After all, experience has taught us that retries in a short period of time do not make sense in some abnormal cases. The specific design can refer to the local message table scheme in my previous article. Local transaction + scheduled task compensation guarantees the successful delivery of messages.

Message queue Broker storage phase

By default, a message queue returns an ACK response to the producer immediately after successfully storing the message in memory in order to respond quickly.

You think the structure is like this:

Message flushing mode

  • Synchronous brush set

When the write success status is returned, the message has already been written to disk. The specific process is that after the message is written to the PAGECACHE of the memory, the flush thread is immediately notified to flush the disk, and then waits for the flush completion. After the flush thread completes execution, it wakes up the waiting thread and returns the state of writing the message successfully.

  • Asynchronous brush set

When the write success status is returned, the message may only be written to the memory PAGECACHE, the write operation return fast, high throughput. When the amount of messages in the memory accumulates to a certain extent, the disk write operation is triggered to rapidly write data.

Change the default asynchronous flush to synchronous flush

flushDiskType=SYNC_FLUSH
Copy the code

Asynchronous flush mode In the event of message queue breakdown, machine power failure, or memory disk damage, the message cannot be persisted to the hard disk, and the message is permanently lost. In this case, we need to change RocketMQ’s flush mechanism from the default asynchronous flush to synchronous flush. That is, the producer ACK response is returned only when the message is successfully saved to disk.

In fact, the structure is like this:

The downside of synchronous flush is obviously reduced throughput and increased response RT time for message delivery, but the loss is worth it in order not to lose valuable messages.

Cluster deployment

This is how a single message queue Broker handles reliably stored messages, but the production environment is definitely clustered. RocketMQ supports single Master, multi-master, multi-master, multi-slave (asynchronous), and multi-Master, multi-slave (synchronous) clustering modes.

Let me clarify that message queues in the production environment must be deployed in a cluster rather than a single machine. It’s definitely ok to play around with standalone deployment locally and in a production environment, you’ve got to be kidding me!

A single Master model

This is risky because if the Broker restarts or goes down, the entire service may become unavailable. This is not recommended for online environments, but can be used for local testing.

More than the Master model

There are no slaves in a cluster but only masters, for example, two or three masters. The advantages and disadvantages of this mode are as follows:

  • Advantages: simple configuration, the maintenance of a single Master disk has no impact on applications. When the RAID10 disk is configured as RAID10, the RAID10 disk is highly reliable and does not lose messages even when the machine is down and cannot be recovered. (a small number of messages are lost when the asynchronous disk is flushed, but none is lost when the synchronous disk is flushed.)
  • Disadvantages: During a single machine outage, unconsumed messages on the machine cannot be subscribed until the machine is restored, affecting the real-time performance of messages.
Multi-master, multi-slave mode
Asynchronous disk flushing multi-master multi-Slave mode

Each Master is configured with a Slave, and there are multiple pairs of master-slaves. HA adopts asynchronous replication, and the Master has a short message delay (in milliseconds). Advantages and disadvantages of this mode are as follows:

  • Advantages: Even if the disk is damaged, the message loss is very small, and the real-time performance of the message is not affected. In addition, when the Master is down, consumers can still consume from the Slave. This process is transparent to the application, without manual intervention, and the performance is almost the same as that of the multi-master mode.
  • Disadvantages: Master down, loss of small messages in case of disk corruption.
Synchronize multi-master, multi-slave mode

Each Master is configured with a Slave. There are multiple master-slave pairs. HA adopts the synchronous dual-write mode.

  • Advantages: No single point of failure for data and services, no delay for messages in the case of Master outage, high service availability and data availability;
  • Disadvantages: Lower performance than asynchronous replication (about 10% lower), higher RT for sending a single message, and the standby node cannot automatically switch to the host when the active node is down.

Even if the message is successfully saved to the Master’s hard disk, and then when the Master synchronized the message to the Slave, the Master hangs during this period, and the hard disk can not be repaired properly, do not say that this situation is impossible, the special cable of Alipay can be cut, what is impossible. Ha ha ha, is also enough unlucky!

In fact, the real structure is like this:

The advantages and disadvantages of the four clustering modes are listed. It is clear that to ensure that messages are not lost at the Broker stage, the production environment must use the fourth clustering mode: synchronous replication multi-master multi-slave mode. The configuration is as follows:

## Master node configuration
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH

Configure the slave node
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
Copy the code

In addition to the previous synchronous flush configuration, the producer sends a message to the Broker, the Master uses synchronous flush to save the message to disk, and the Slave uses synchronous replication to copy the message to the Slave. After the Slave saves the message, the Broker returns the message to the producer ACK.

Messages are stacked

For the system with high concurrency, if the downstream consumers break down, it will lead to a large number of messages piled up in the message queue, so it is easy to burst the hard disk of the server, new messages sent to the message queue, hard disk refused to write, then the message is easy to lose. Therefore, the machine on which message queues are deployed should have plenty of hard disk space and some monitoring to prevent this from happening.

Consumption stage

Finally arrived at the last stage, but we can not be careless. Consumer pull local business news, and business processing is complete to submit an ACK ConsumeConcurrentlyStatus. CONSUME_SUCCESS, must not again to submit to an ACK for business processing. If a business deal with abnormal situation, to return ConsumeConcurrentlyStatus. RECONSUME_LATER waiting for message queue try again next time.


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");
consumer.subscribe("TopicTest"."*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("127.0.0.1:9876; 127.0.0.1:9870");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

  MessageExt messageExt = msgs.get(0);
  // Perform service processing

  / / handle failure return ConsumeConcurrentlyStatus RECONSUME_LATER

  / / processing successfully returns ConsumeConcurrentlyStatus CONSUME_SUCCESS
  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
Copy the code

Another point to note is that message queue RocketMQ by default allows up to 16 retries per message, with the following retry intervals:

Number of retries Time elapsed since the last retry Number of retries Time elapsed since the last retry
1 10 seconds 9 7 minutes
2 30 seconds 10 Eight minutes
3 1 minute 11 9 minutes
4 2 minutes 12 Ten minutes
5 3 minutes 13 Twenty minutes
6 4 minutes 14 30 minutes
7 5 minutes 15 1 hour
8 6 minutes 16 2 hours

If the message fails after 16 retries, the message is no longer delivered. If the retry interval is strictly calculated, if a message fails to be consumed all the time, the message will be retried 16 times in the next 4 hours and 46 minutes. If the retry interval is exceeded, the message will not be delivered again. At this point, message queue RocketMQ does not immediately discard the message, but sends it to a special queue corresponding to the consumer. In Message Queue RocketMQ, such messages that would not normally be consumed are called dead-letter messages, and the special queues that store them are called dead-letter queues. Messages in the dead-letter queue are valid for the same 3 days as normal messages. It will be automatically deleted after 3 days. In this case, we need to process the messages in the dead-letter queue in order not to lose them.

A message entering a dead-letter queue means that some problem is preventing the consumer from consuming the message properly, and therefore, human intervention is often required for special processing. Once suspicious factors have been identified and the problem resolved, the message can be resend at the Message queue RocketMQ console for the consumer to re-consume, or a dedicated consumer can simply subscribe to the dead-letter queue for consumption.

The dead-letter queue name is typically %DLQ% + ConsumerGroupName, and the RETRY queue name is typically %RETRY% + ConsumerGroupName, which RocketMQ automatically creates.

conclusion

A message goes through three stages from birth to termination: production, storage and consumption. We provide different solutions for possible message loss in different stages. As a result, The probability of RocketMQ losing messages was greatly reduced. Let’s zoom out a little bit, and you’ll see that you don’t lose messages for different message queues, only the configuration of message queues is slightly different, and everything else is similar. It seems that we have developed a methodology to deal with message loss, so we don’t panic when we encounter other message queues.

This article was first published on personal blog how to ensure RocketMQ messages are not lost. Unauthorized republication is prohibited.