“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

This is the final part of the three-part RocketMQ series, which covers the advanced features and source analysis of RocketMQ.

1. Advanced functions

1.1 Message Storage

Because of the high reliability requirement of distributed queue, data should be stored persistently.

  1. The message producer sends the message
  2. MQ receives the message, persists the message, and adds a new record to the store
  3. Returns an ACK to the producer
  4. MQ push message to the corresponding consumer and wait for the consumer to return ACK
  5. If the message consumer returns ACK successfully within the specified time, MQ considers the message consumption to be successful and deletes the message from the store, that is, step 6 is executed. If MQ does not receive ACK within the specified time, it considers that message consumption fails and tries to push the message again, repeating steps 4, 5 and 6
  6. MQ Delete message

1.1.1 Storage Media

  • Relational database DB

Another open source MQ — ActiveMQ under Apache (KahaDB is used as message storage by default) can choose JDBC method to do message persistence, and JDBC message storage can be realized by simple XML configuration information. For common relational databases (such as Mysql), IO read/write performance bottlenecks often occur when the amount of data in a single table reaches tens of millions. In terms of reliability, this scheme relies heavily on DB. If DB fails, MQ messages cannot be stored on the disk, leading to online failures

  • The file system

Several commonly used products in the industry, such as RocketMQ, Kafka, and RabbitMQ, all use the message flushing to the file system of the deployed VM or physical machine for persistence (the flushing can be divided into asynchronous and synchronous modes). Message flushing provides a high efficiency, high reliability and high performance data persistence method for message storage. Unless the MQ machine itself is deployed or the local disk is down, the failure to persist is usually not an issue.

1.1.2 Performance Comparison

File system > relational database DB

1.1.3 Message storage and Sending

1) Message storage

If the disk is used properly, the speed of the disk can match the data transfer speed of the network. The sequential write speed of current high-performance disks can reach 600MB/s, which exceeds the transmission speed of ordinary network adapters. But the speed of random disk write is only about 100KB/s, which is 6000 times better than sequential write performance! Because of this huge speed difference, a good message queuing system can be orders of magnitude faster than a normal message queuing system. RocketMQ’s messages are written sequentially, ensuring the speed of message storage.

2) Message sending

The Linux operating system is divided into user mode and kernel mode. File operations and network operations need to switch between the two modes, which inevitably leads to data replication.

A server sends the contents of a local disk file to a client in two steps:

1) read; Read local file contents;

2) write; Send the read content over the network.

These two seemingly simple operations actually replicated data four times, as follows:

  1. Copy data from disk to kernel mode memory;
  2. Copy from kernel mode memory to user mode memory;
  3. Then copy from user-mode memory to network-driven kernel-mode memory;
  4. Finally, it is copied from the kernel mode memory of the network driver to the network adapter for transmission.

By using MMAP, you can save the memory replication to the user mode and improve the speed. This mechanism is implemented in Java through MappedByteBuffer.

RocketMQ takes advantage of these features, known as “zero-copy” technology, to increase the speed at which messages are saved on the disk and sent over the network.

One of the limitations of MappedByteBuffer is that only 1.5-2g files can be mapped to user mode virtual memory at a time. This is why RocketMQ sets a single CommitLog data file to 1G by default.

1.1.4 Message storage structure

  • CommitLog: stores metadata for messages
  • ConsumerQueue: The index that stores messages on CommitLog
  • IndexFile: Provides a way to query messages by key or time interval without affecting the main process of sending and consuming messages

1.1.5 Disk Flushing Mechanism

RocketMQ’s messages are stored on disk, both to ensure recovery after a power failure and to allow the number of messages stored to exceed memory limits. To improve performance, RocketMQ ensures sequential disk writes as much as possible. When messages are written to RocketMQ through Producer, there are two types of disk writing: distributed synchronous disk flushing and asynchronous disk flushing.

1) Flush disks synchronously

When the write success status is returned, the message has been written to disk. The specific process is that the message is written to the PAGECACHE of the memory, immediately notify the flush thread flush disk, and then wait for the completion of the flush disk, flush the completion of the thread to wake up the waiting thread, the message is written back to the state of success.

2) Asynchronous disk flushing

When the write success status is returned, the message may just be written to the PAGECACHE of the memory. The return of the write operation is fast and the throughput is large. When the amount of messages in the memory accumulates to a certain extent, the disk is written quickly.

3) configuration

Synchronous or asynchronous flushing is set using the flushDiskType parameter in the Broker configuration file, which is configured to be either SYNC_FLUSH or ASYNC_FLUSH.

1.2 High Availability Mechanism

The RocketMQ distributed cluster achieves high availability through the combination of Master and Slave.

The difference between Master and Slave: In the Broker configuration file, the brokerId parameter has a value of 0 to indicate that the Broker is Master, greater than 0 to indicate that the Broker is Slave, and the brokerRole parameter also indicates whether the Broker is Master or Slave.

The Master Broker can read and write messages. The Slave Broker can only read messages. That is, the Producer can only connect to the Master Broker and write messages. A Consumer can connect to either a Master Broker or a Slave Broker to read messages.

1.2.1 Message consumption High availability

In the Consumer configuration file, there is no need to set whether to read from the Master or Slave. When the Master is unavailable or busy, the Consumer is automatically switched to the Slave. With the automatic Consumer switching mechanism, if a Master machine fails, the Consumer can still read messages from the Slave without affecting the Consumer program. This is high availability on the consumer side.

1.2.2 Sending messages is highly available

When creating a Topic, create multiple Message queues for a Topic on multiple Broker groups (machines with the same Broker name and different brokerId to form one Broker group), so that when the Master of one Broker group is unavailable, The masters of other groups are still available, and the producers can still send messages. RocketMQ does not currently support automatic conversion of Slave to Master. If you need to convert a Slave to Master, manually stop the Slave Broker, change the configuration file, and start the Broker with the new configuration file.

1.2.3 Message replication

If a Broker group has a Master and Slave, messages need to be replicated from the Master to the Slave, both synchronously and asynchronously.

1) Synchronous replication

In synchronous replication mode, the write success status is reported to the client after both Master and Slave are written successfully.

In synchronous replication mode, if the Master fails, the Slave has all backup data, which is easy to recover. However, synchronous replication increases the data write delay and reduces the system throughput.

2) Asynchronous replication

In asynchronous replication, the write success status is reported to the client as long as the Master is successfully written.

In asynchronous replication, the system has low latency and high throughput, but if the Master fails, some data may be lost because it is not written to the Slave.

3) configuration

Synchronous and asynchronous replication is set using the brokerRole parameter in the Broker configuration file, which can be set to one of the ASYNC_MASTER, SYNC_MASTER, or SLAVE values.

4) summary

In actual applications, set the flush mode and the primary/secondary replication mode, especially the SYNC_FLUSH mode, based on service scenarios. As disk write actions are frequently triggered, performance is significantly reduced. In general, the Master and Save should be configured as ASYNC_FLUSH, and the Master and slave should be configured as SYNC_MASTER. In this way, even if one machine fails, data will still be kept.

1.3 Load Balancing

1.3.1 Producer Load Balancing

On the Producer side, when sending messages, each instance polls all message queues by default so that the messages are evenly placed on different queues. Since queues can be scattered across different brokers, messages are sent to different brokers, as shown below:

The labels on the arrow lines in the figure represent the order, with publishers sending the first message to Queue 0, then the second message to Queue 1, and so on.

1.3.2 Consumer Load balancing

1) Cluster mode

In cluster consumption, each message needs to be delivered to only one instance of the Consumer Group that subscribes to the topic. RocketMQ uses an active pull to pull and consume messages, specifying which message queue to pull.

Every time the number of instances changes, load balancing is triggered, and the queue is evenly distributed to each instance according to the number of queues and the number of instances.

The default allocation algorithm is AllocateMessageQueueAveragely, the diagram below:

There’s another average algorithm is AllocateMessageQueueAveragelyByCircle, evenly between each queue, is just in the form of circular queue in turn points, the following figure:

In cluster mode, only one instance of a queue can be allocated. This is because if multiple instances of a queue consume messages at the same time, the same message will be consumed multiple times by different instances. So the algorithm is that a queue is assigned to only one consumer instance, and a consumer instance can be assigned to different queues at the same time.

By adding consumer instances to the queue, the consumption power of the queue can be expanded horizontally. When an instance goes offline, load balancing is triggered again, and the queue that was allocated to the queue will be allocated to another instance for consumption.

However, if the number of consumer instances is greater than the total number of message queues, the additional consumer instances will not be assigned to the queue and will not consume messages, thus not distributing the load. So you need to control so that the total number of queues is greater than or equal to the number of consumers.

2) Broadcast mode

Since broadcast mode requires that a message be delivered to all consumer instances under a consumer group, there is no such thing as a message being distributed.

One of the differences in implementation is that when consumers are assigned queues, all consumers are assigned to all queues.

1.4 Message Retry

1.4.1 Sequential message retries

For sequential messages, when the consumer fails to consume the message, the message queue RocketMQ automatically retries the message repeatedly (at an interval of 1 second), at which point the application will be blocked from consuming the message. Therefore, when using sequential messages, it is important to ensure that the application can monitor and handle consumption failures in a timely manner to avoid blocking.

1.4.2 Unordered message retry

For unordered messages (normal, scheduled, delayed, transactional), you can achieve message retry results by setting the return status when the consumer fails to consume the message.

The retry of unordered messages takes effect only for cluster consumption. The broadcast mode does not provide the failure retry feature. That is, after a failure is consumed, the failed message is not retried and new messages are consumed

1) Retry times

Message queue RocketMQ allows a maximum of 16 retries per message by default, with the following interval for each retry:

The number of retries The interval since the last retry The number of retries The interval since the last retry
1 10 seconds 9 7 minutes
2 30 seconds 10 Eight minutes
3 1 minute 11 9 minutes
4 2 minutes 12 Ten minutes
5 3 minutes 13 Twenty minutes
6 4 minutes 14 30 minutes
7 5 minutes 15 1 hour
8 6 minutes 16 2 hours

If the message fails after 16 retries, the message will not be delivered. If a message fails to be consumed, 16 retries will be performed within the next 4 hours and 46 minutes. The message will not be delivered again after the retry period.

Note: No matter how many times a Message is retried, the Message ID of those retried messages does not change.

2) Configuration mode

If the consumption fails, configure the mode again

In cluster consumption mode, if message consumption fails, message retries are expected. You need to explicitly configure the message listener interface in any of the following ways:

  • Return to action.reconsumelater (recommended)
  • Returns Null
  • An exception is thrown
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        // Process the message
        doConsumeMessage(message);
        // Method 1: return action.reconsumelater and the message will be retried
        return Action.ReconsumeLater;
        // Mode 2: returns NULL, and the message will be retried
        return null;
        // Mode 3: throw the exception directly, and the message will be retried
        throw new RuntimeException("Consumer Message exceotion"); }}Copy the code

If the consumption fails, the configuration mode is not retried

In cluster consumption mode, if the message fails, the message is not expected to be retried. You need to catch the exception that may be thrown in the consumption logic and finally return Action.CommitMessage. After that, the message will not be retried.

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            // Catch all exceptions in the consumption logic and return Action.CommitMessage;
            return Action.CommitMessage;
        }
        // Return Action.CommitMessage;
        returnAction.CommitMessage; }}Copy the code

Gets the number of message retries

After receiving the message, the consumer can obtain the retry times of the message as follows:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        // Get the number of retries for the message
        System.out.println(message.getReconsumeTimes());
        returnAction.CommitMessage; }}Copy the code

1.5 Dead letter Queue

When an initial consumption of a message fails, the message queue RocketMQ automatically retries the message. If the consumption fails after the maximum number of retries is reached, it indicates that the consumer was not able to consume the message correctly under normal circumstances. In this case, the message queue RocketMQ does not immediately discard the message, but instead sends it to a special queue for the consumer.

In Message queuing RocketMQ, such messages that can’t normally be consumed are called dead-letter messages, and the special queues that store dead-letter messages are called dead-letter queues.

1.5.1 Dead letter feature

Dead-letter messages have the following features

  • It’s not going to be consumed by consumers.
  • The validity period is the same as the normal message, which is 3 days. After 3 days, the message will be automatically deleted. Therefore, please process the dead letter message within 3 days after it is generated.

Dead letter queues have the following features:

  • A dead letter queue corresponds to a Group ID, not to a single consumer instance.
  • If a Group ID does not generate a dead letter message, message queue RocketMQ will not create a corresponding dead letter queue for it.
  • A dead letter queue contains all the dead letter messages generated for the corresponding Group ID, regardless of the Topic to which the message belongs.

1.5.2 Viewing Dead Letter Information

  1. Query the console for information about the topic where the dead letter queue appears

2. Query dead letter messages by subject on the MESSAGE page

3. Select resend the message

When a message goes to a dead letter queue, it means that some factor prevents the consumer from consuming the message properly, so you usually need to do special processing on it. Once the suspect is identified and the problem resolved, the message can be resend on the RocketMQ console of the message queue, allowing the consumer to re-consume.

1.6 Consumption idempotence

Message queue After the RocketMQ consumer receives the message, it is necessary to idempotent the message according to the unique Key on the business.

1.6.1 Necessity of idempotence of consumption

In Internet applications, especially when the network is unstable, the message queue RocketMQ may be repeated, which can be summarized as follows:

  • The message was repeated when sent

When a message has been successfully sent to the server and persistence has been completed, the server fails to respond to the client due to intermittent network disconnection or client breakdown. If at this point the producer realizes that the Message failed and tries to send the Message again, the consumer will then receive two messages with the same content and the same Message ID.

  • Messages duplicate when delivered

In the message consumption scenario, the message has been delivered to the consumer and the service processing is complete. When the client sends a reply to the server, the network is intermittently disconnected. To ensure that the Message is consumed at least once, the RocketMQ server in the Message queue will try to deliver the previously processed Message again after the network is restored, and the consumer will then receive two messages with the same content and the same Message ID.

  • Message duplication during load balancing (including but not limited to network jitter, Broker restart, and subscriber application restart)

Rebalance is triggered when a RocketMQ Broker or client in the message queue restarts, expands or shrinks, and consumers may receive repeated messages.

1.6.2 Processing Methods

Because Message ids have the potential to conflict (duplicate), it is not recommended that Message ids be used as the basis for truly secure idempotent processing. The best approach is to use the business unique identity as the Key basis for idempotent processing, which can be set with the message Key:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
Copy the code

When a subscriber receives a message, it can idempotent according to the Key of the message:

consumer.subscribe("ons_test"."*".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // Idempotent processing is performed according to the key uniquely identified by the service}});Copy the code

2. Source analysis

2.1 Environment Construction

Source code environment structure

2.2 NameServer

NameServer source analysis

2.3 Producer

Product source analysis

2.4 Message Storage

Message store source section

2.5 Consumer

Consumer source code Analysis