What is a message queue and what does a message queue do

What is a message queue? What is a message queue for?

RocketMQ

The working model of RocketMQ

The following is an architecture diagram from the rocketMQ source code

  • -Serena: Well, I’m not being a producer.
  • nameServer: Lightweight service routing registry, which provides two main functions:
    • Broker management: nameServer accepts registration of broker clusters and provides a heartbeat mechanism to determine whether the broker is alive or not
    • Routing information management: Saves the entire routing information in the Broker cluster. NameServer enables producers/consumers to know the routing information of the entire broker cluster and to deliver messages.

    The Borker cluster registers its routing information with each nameServer, which ensures that if a nameServer goes down, Other Nameservers can still provide services externally.

  • broker: Message storage and delivery, through virtual topics and actual queues. Contains the following services
    • Remoting Module: Sub-module of the broker that processes client requests
    • Client Manager: Manages clients (producers, consumers) and maintains consumers’ topic subscriptions
    • Store Service: Provides a persistent service for messages
    • HA Service: provides master/slave message replication services
    • Index Service: Creates message indexes to facilitate quick query of messages
  • -Penny: You’re a consumer.

The previous schema diagram is missing the internal content of the broker

  • Topic: A logical grouping of brokers that is only a logical concept.
  • Message Queue: The actual area of memory on the broker, the final destination of the message, and where the message is stored.

7 messages for RocketMQ

  1. Ordinary message
    • According to the sending mode of the producer, common messages can be divided into synchronous, asynchronous, and one-way messages. RocketMQ confirms the same or asynchronous messages, but does not respond to one-way messages.
  2. The order message
    • Sequential messages are actually sent to the same queue, using the first-in, first-out principle. When consumed, the order of messages is ensured due to the first-in, first-out principle. In practice this order only ensures partial order.
  3. Broadcast messages
    • Literally, there is a message sent by a producer that all consumers can consume without affecting each other.
  4. Delay message
    • When a producer sends a message, it sets a delay level, and sends a delay based on the default rocketMQ delay level. The consumer side is no different from a normal message, right
  5. Batch message
    • At the producer end, messages are sent in batches, accumulated to a certain amount
  6. Filter messages
    • When the producer sends messages, filter conditions are set, and the consumer selects appropriate messages for consumption according to the filter conditions.
  7. Transaction message
    • The message sent by the producer is sent to the queue corresponding to TOPIC RMQ_SYS_TRANS_HALF_TOPIC, and then the producer local transaction is executed. If the local transaction succeeds and returns success, The messages previously sent by the producer are removed from the queue corresponding to RMQ_SYS_TRANS_HALF_TOPIC and moved to the queue specified by the producer. Otherwise the queue sent by the producer will be rolled back.

    • Of course, this only guarantees transactional consistency between local transactions and message writes, but if you want to ensure that downstream consumers are also consistent, you need the support of distributed transactions.

    • Usage scenario: Place an order on Taobao but do not pay, there is a waiting time of 15 minutes for payment. When placing an order, send an unknown signal to the broker, wait for MQ to check the payment status, determine that 15 has not been paid and cancel the order.

RocketMQ and Spring_boot integration

The sample code has been uploaded to GITHUB

Rely on

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
Copy the code

The configuration file

Rocketmq. Name - server = 172.12.11.10:9876 rocketmq. Producer. The group = topic_test - consumerGroupCopy the code

producers

@RestController
public class ProducerControl {
    Logger logger = LoggerFactory.getLogger(ProducerControl.class);
    @Resource
    RocketMQTemplate rocketMQTemplate;
    /** * convertAndSend() * SpringBoot encapsulates the RocketMQ API * destination: topic:tag * payload: Load, the actual content sent */
    @RequestMapping("/sendMSG")
    public String sendMSG(a){
        // Common message
        rocketMQTemplate.convertAndSend("topic_test:tagXXX"."hello-world");
        logger.info("Normal message sent successfully");
        // Sequential messages
        for(int i = 0; i<10; i++){ SendResult sendResult = rocketMQTemplate.syncSendOrderly("topic_test"."order_message_" + i, "");
            logger.info("Sequential message :{}, send status :{}",sendResult.getMsgId(),sendResult.getSendStatus());
        }
        /** * Broadcast message: As with producer and normal message sending, the consumer needs to set the messageModel to broadcast mode */

        // Delay messages -- Set delayLevel when sending messages
        // syncSend(String destination, Message
       message, long timeout, int delayLevel)
        Message delayedMessage = MessageBuilder.createMessage("delayed_message".new MessageHeaders(new HashMap<>()));
        rocketMQTemplate.syncSend("topic_test",delayedMessage,3000.1);
        logger.info("Delayed message sent successfully");

        // Batch messages: Combine multiple messages into a batch message and send it at once
        List<Message> messageList = new ArrayList<Message>();
        for(int i = 0; i<10; i++){ messageList.add(MessageBuilder.createMessage("batch_message_"+i, new MessageHeaders(new HashMap<>())));
        }
        rocketMQTemplate.syncSend("topic_test",messageList);
        logger.info("Batch message sent successfully");

        // Filter messages: mainly consumer side filter, set tag or SQL92 mode
        Message<String> tag_message = MessageBuilder.createMessage("tag_message".new MessageHeaders(new HashMap<>()));
        rocketMQTemplate.syncSend("topic_test:tag_A",tag_message);
        logger.info("Filtered message sent successfully");

        // Transaction message
        Message message = MessageBuilder.createMessage("transaction_message".new MessageHeaders(new HashMap<>()));
        rocketMQTemplate.sendMessageInTransaction("topic_test",message,"");
        logger.info("Transaction message sent successfully");

        rocketMQTemplate.getProducer().setProducerGroup("");
        return "Sent successfully"; }}Copy the code
Producer-transaction message listeners (transaction messages are required for this class)

ExecuteLocalTransaction: local transaction method. If the local transaction executes successfully, success is returned, and the checkLocalTransaction method is not called.

CheckLocalTransaction: If the local transaction method fails, rocketMQ executes it and checks back to see if the local transaction succeeded. If successful, the message is normally written to the corresponding topic.

@RocketMQTransactionListener()
public class MyTransactionListener implements RocketMQLocalTransactionListener {
    Logger logger = LoggerFactory.getLogger(MyTransactionListener.class);

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        logger.info("executeLocalTransaction:"+message);
        // If the local transaction is successful and returns a success, the checkLocalTransaction method will not be executed
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        logger.info("checkLocalTransaction:"+message);
        returnRocketMQLocalTransactionState.COMMIT; }}Copy the code

consumers

@Component
@RocketMQMessageListener(consumerGroup = "topic_test-consumerGroup", topic = "topic_test")
public class Consumer implements RocketMQListener<String> {
    Logger logger = LoggerFactory.getLogger(Consumer.class);
    @Override
    public void onMessage(String message) {
        logger.info("Consumer News:+message); }}Copy the code

Message persistence

Due to the high reliability requirements of message queues, rocketMQ data must be persisted

The following is a message storage architecture diagram provided in the rocketMQ source code

The message store architecture diagram mainly consists of the following three files related to the message store.

  1. CommitLog: message body and metadata storage body, which stores message body content written by the Producer end. In the sequential write mode, a block of storage (1 GB in size) is allocated to the disk in advance. When the Producer end writes messages, the file is sequentially appended to the file until the file is full. Then delimit a new storage area (size: 1G);

  2. ConsumeQueue: message consumption queue, introduced primarily to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is subject specific, and it is inefficient to retrieve messages by topic through commitlog files. The Consumer can then look for messages to consume based on the ConsumeQueue. ConsumeQueue serves as the index of the consuming messages, storing the CommitLog’s starting physical offset, message size, and HashCode of the message Tag. ;

  3. IndexFile: IndexFile provides a way to query messages by key or time interval. An IndexFile can hold 2000W indexes. The underlying storage of IndexFile is designed to implement the HashMap structure in the file system. So rocketMQ’s index file is implemented as a hash index.

From the above structure, we can draw the following conclusions

  • The consumerQueue and IndexFile files are stored as indexes in the message commitLog.
  • Messages go from commitlog to consumerQueue and IndexFile, and there must be a background process that continuously sends requests and asynchronously builds ConsumeQueue and IndexFile data
  • With such high throughput for disk-based operations, rocketMQ has made a number of optimizations for reading and writing files
    • The FileChannel model in NIO is used to map physical files on disk directly to user-mode memory address (MMAP), reducing the performance cost of copying files from user-mode to application memory. Converts operations on files to direct operations on memory addresses.
    • PageCache (PageCache) : part of the special memory space, and then the file is written to the PageCache, and then written to disk by the operating system kernel thread.

When to persist

  1. When MQ receives a message, it needs to persist the message, otherwise it will be lost if MQ goes down
  2. When a consumer consumes a message, it needs to mark a consumption status for the message and persist it, otherwise when MQ restarts, the marker is lost and the message is re-consumed.

Persist to where

To save the rocketMQ persistent file, specify the configuration file address in the configuration file: /rocketmq-all-4.3.2-bin-release/conf/ Select an asynchronous or synchronous configuration file based on the rocketMQ startup mode

# storage path
storePathRootDir=/opt/store
CommitLog storage path
storePathCommitLog=/opt/store/commitlog
Consume queue storage path Storage path
storePathConsumeQueue=/opt/store/consumequeue
Message index storage path
storePathIndex=/opt/store/index
#checkpoint File storage path
storeCheckpoint=/opt/store/checkpoint
#abort file storage path
abortFile=/opt/store/abort
Copy the code

Brush plate mechanism

As shown in the figure above, rocketMQ messages persist to disk and can be divided into synchronous and asynchronous flushes;

  • Synchronous flush: After a message is sent to the broker, the producer considers that the message is sent successfully only after the message is written to disk and an ACK response is returned. This ensures high reliability and low throughput
  • Asynchronous flush: Every time a message is sent to the broker, pageCache returns a successful ACK response whenever the message is written. Messages may be lost, but the throughput is high.

Message master/slave synchronization

If the broker is deployed in a cluster, there must be a master to slave replication of the previous data. Message replication can be divided into synchronous replication and asynchronous replication

  • Synchronous replication: Data is successfully written to both the master and slave nodes. High data security and low throughput
  • Asynchronous replication: As long as the master node writes data successfully, the producer is notified that the data is written successfully, and then the data is asynchronously copied from the master node to the slave node. The throughput is high and data is lost.
  • BrokerRole Parameters: ASYNC_MASTER, SYNC_MASTER, SLAVE. BrokerRole parameters: ASYNC_MASTER, SYNC_MASTER, SLAVE.

Load balancing mechanism

RocketMQ load balancing is divided into: load balancing for sending messages from the Producer and load balancing for subscribing messages from the Consumer

  • Load balancing: By default, the Producer sends messages to a topic for the first time and gets a random number modulo the total number of consumerQueue. It then retrieves the module from the list on consumerQueue. After sending a message to the topic, incrementing the original random number and modulo it. RocketMQ also provides a sendLatencyFaultEnable parameters, on the basis of the original increasing modulus, filter out the last failed broker, under a certain time 】 【 source location: MQFaultStrategy. SelectOneMessageQueue ()

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // Enable sendLatencyFaultEnable
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // Determine if there was a previous failed broker, if so, continue the loop to select the next broker
                    // latencyFaultTolerance The update operation for this object is updated when sending is complete, if sendLatencyFaultEnable is enabled
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
    
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if(notBestBroker ! =null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else{ latencyFaultTolerance.remove(notBestBroker); }}catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // If sendLatencyFaultEnable is not enabled, incrementally fetch modules
            return tpInfo.selectOneMessageQueue();
        }
    Copy the code
  • Consumer side subscription message load balancing: it can be divided into broadcast mode and cluster mode according to consumption mode

    • In broadcast mode: first obtain all messageQueue according to topic, then traverse all messageQueue and pull the message
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // Get messageQueue for topic
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if(mqSet ! =null) {
                    // Go through all messageQueue and pull messages one by one
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); }}else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            /** * omit cluster mode */
    Copy the code
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
        // The first time you enter the file, you must enter the file
        }
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
                // Remove the offset
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                // According to the configuration, see where to start the consumption, from the first, the last, a certain time stamp to start consumption
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if(pre ! =null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        // Build the pull request
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true; }}else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); }}}// Pull the message
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }
    Copy the code
    • Cluster mode: messageQueue is assigned to each consumer based on the configured load balancing policy and the ClientId of the current consumer
     case CLUSTERING: {
           // Get all queues for the topic
           Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
           // Get all consumers
           List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); .if(mqSet ! =null&& cidAll ! =null) {
               List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
               mqAll.addAll(mqSet);
               Collections.sort(mqAll);
               Collections.sort(cidAll);
               AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
               List<MessageQueue> allocateResult = null;
               try {
                   // Assign messageQueue based on the current consumer's ClientId and rules
                   allocateResult = strategy.allocate(
                       this.consumerGroup,
                       this.mQClientFactory.getClientId(),
                       mqAll,
                       cidAll);
               } catch (Throwable e) {
                   ...
               }
               Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
               if(allocateResult ! =null) {
                   allocateResultSet.addAll(allocateResult);
               }
               boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); . }break;
       }
    Copy the code

    Cluster load balancing policies:

    • AllocateMachineRoomNearby: allocate the Consumer and the Broker priority with room together.
    • AllocateMessageQueueAveragely: the average allocation. Distribute all MessageQueue equally to each consumer
    • AllocateMessageQueueAveragelyByCircle: distribution of polling. Assign a MessageQueue to a consumer in turn
    • AllocateMessageQueueByConfig: no allocation, specify a messageQueue list directly
    • AllocateMessageQueueByMachineRoom: distribution according to the concept of logic machine room
    • To allocate MessageQueue AllocateMessageQueueConsistentHash: eradicate consistency hash algorithm

A redelivery/retry mechanism for messages

Producer – Message recast

When a producer sends a message, an exception occurs and the message is not sent to the broker. Synchronous messages have a retry mechanism, asynchronous messages have a retry mechanism, and one-way messages have no message security guarantee.

  • RetryTimesWhenSendFailed: Indicates the number of times that a message fails to be sent. The default value is 2. The producer will repost retryTimesWhenSendFailed+1 times and does not select the broker that failed last time.
  • RetryTimesWhenSendAsyncFailed: asynchronous messaging failure retries, asynchronous retry will not choose other broker, in just the same broker try try again.
  • RetryAnotherBrokerWhenNotStoreOK: message persistence timeout or slave synchronization failure, whether to try to deliver other broker. The default false

Consumer – Message retry

After the producer sends the message to the broker normally, the message consumption fails and the consumer pulls up the message and consumes it again.

RocketMQ sets up a “%RETRY%+consumerGroup” RETRY queue for each topic, The retry level is set to 1s 5s 10s 30S 1m 2m 3m 4m 5M 6m 7m 8m 9m 10m 20m 30m 1h 2H. The more retry times, the higher the retry level is.

Principle: When message consumption fails, rocketMQ sends the failed message to the delay queue whose topic is “SCHEDULE_TOPIC_XXXX”. The background scheduled task moves the message from the delay queue to the corresponding topic at the corresponding time.

If it still fails after 18 retries, rocketMQ sends the failed message to the dead-letter queue.

Idempotency of messages

Message duplication is inevitable when ensuring message reliability, and if the business is sensitive to message duplication, then duplicate messages need to be filtered at the business layer.

This makes it important to ensure that the message has a unique key, either msgId or a unique identifying field for the business, such as an order ID. In practice, use the unique identifier of the service as possible, because there may be cases where the msgId is inconsistent but the message content is consistent, such as message retry.

Does RocketMQ support high availability? How to achieve high availability?

RocketMQ itself does not support high availability. However, A RocketMQ cluster can be set up based on DLedger to enable automatic Dr Switchover.

  1. Download rocketMQ version 4.5 or later
  2. Prepare at least three nodes
  3. Start nameServer properly
  4. Root out configuration culture [../conf/dledger] to start the broker

Message loss problem

What are the situations in which message loss may occur

From the above, we can roughly know that there are four steps from message sending to consumption:

  1. The producer sends a message to the broker of the MQ-master node
  2. Broker synchronizes primary and secondary
  3. The broker is persisted to disk
  4. Consumers pull message consumption

How do I prevent message loss

In the above four steps, message loss is possible, so how to prevent message loss from the four points

  1. Producers use transactional messaging to ensure message reliability, which ensures consistency between the producer’s local transactions and the message transactions sent to the broker. [1 can be avoided]

  2. RocketMQ configure synchronous flush +Dledger master/slave architecture to ensure that MQ itself does not lose messages.

  3. The consumer synchronously pulls the message and returns the consumer success sign after the local business processing is complete.