Make writing a habit together! This is the 14th day of my participation in the “Gold Digging Day New Plan · April More Text Challenge”. Click here for more details.


The main process for sending RocketMQ messages has been reviewed. Now I will introduce the source code and process for consuming messages, including but not limited to the following:

  • Message consumption pattern
  • The process of message consumption
  • Consumer load balancing algorithm
  • How do I retry a consumption failure
  • How are messages redelivered
  • .

Message reception model

When a Consumer group consumes a message, it fetches the address of the queue of topics in the Broker server from the registry and consumes it according to a load-balancing algorithm.

There are pull and push modes for RocketMQ’s message push, but the underlying push mode also uses pull mode.

The consumer group distributes all MQ equally to all the consumers in the group. Consumers in the same group cannot listen to more than one Topic at the same time, otherwise some messages may not be consumed due to the load balancing algorithm.

If the message fails to be consumed, the message is retried, with six retries by default. If it fails again, the message is placed in a dead-letter queue.





The consumer side listens to the model

Listener design pattern

RocketMQ follows the listener design pattern in design mode, and the inheritance relationship between classes is shown below: ConsumeMessageConcurrentlyServiceIs the listener’s execution class


Register listeners

Write a custom consumer and bind the listener code as follows:

public static void main(String[] args) throws Exception{
    // Create the consumer object and set the group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group A");
    // Subscribe to messages under the message subject target_topic, where * represents all messages
    consumer.subscribe("target_topic"."*");
    // Set consumption to start from the previous site
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // Register the listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        // Method of consuming messages
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Unlock the consumer
    consumer.start();
    System.out.println("Consumer Started");
}
Copy the code

The steps are as follows:

  1. Create a consumer and specify the group to which it belongs
  2. Subscribe to the topictarget_topicAnd consume all messages
  3. Sets the consumption to continue from the last consumption
  4. Register listenersMessageListenerConcurrentlyDefines the logic for consuming messagesconsumeMessage
  5. Priming consumer


Priming consumer

After the Consumer registers the listener, the Consumer does not start to consume. After the Consumer calls the start method, the Consumer begins to prepare for consumption.

    public synchronized void start(a) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}".this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                // Check the configuration
                this.checkConfig();

                // Copy the subscription data
                this.copySubscription();

                // Set InstanceName to pid (number)
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                // Set load balancing parameters
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                // The default is cluster mode, where each message is consumed by a consumer within a group
                // There is also broadcast mode, where each message is consumed once by all consumers in the same group
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // The default is uniform allocation
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                // Encapsulates the pull message API
                this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                // Store the consumption progress. In cluster mode, the progress is stored on the Broker and shared among consumers in the same group. In broadcast mode, the progress is stored locally among consumers
                if (this.defaultMQPushConsumer.getOffsetStore() ! =null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                // Create services based on sequential or concurrent listening
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

                // Check whether the registration is successful. If there is a duplicate group name before, the registration fails
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.".this.defaultMQPushConsumer.getConsumerGroup());
                // Set the service status
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
            default:
                break;
        }
        // Get TopicRouteData from the registry, update TopicPublishInfo and MQ (periodically called)
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        // Send heartbeats to all brokers in TopicRouteData, register Consumer/Producer messages to the Broker (periodic calls)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // Wake up the MQ balancing service, then start pulling zone messages
        this.mQClientFactory.rebalanceImmediately();
    }
Copy the code

The execution flow of the above code is as follows:

1. Check the service status. Check whether the Consumer is running

  • CREATE_JUST: Consumer has just been created and is not running yet
  • RUNNING: Consumer is running
  • SHUTDOWN_ALREADY: Consumer has been shut down
  • START_FAILED: Consumer fails to start

2. If the Consumer has just been created, perform a series of checks

  • If you are just creating a Consumer, set the status bit toSTART_FAILED, and then set to after the initialization process is completeRUNNING
  • Check the configuration related to receiving messages, such as group name formatting, etc
  • Gets the unique identity of the Consumer in the formatip@instanceName
  • Set the load balancing consumption mode and specify the load balancing group and message consumption policy
  • Encapsulate the pull message API
  • Create a consumption progress processor. Clustered consumption progress should be stored on the Broker, shared by consumers within the group, and broadcast consumption progress stored in Consumer
  • Depending on whether you are listening in sequential or concurrent modeConsumerService. Consumption is implemented by instances of this class




ConsumeMessageConcurrentlyServiceclass

public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
    // Set the message consumer object
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    // Set the register listener object
    this.messageListener = messageListener;
    // Get message consumers
    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    // Get the group of message consumers
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    // Create a consumer queue
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    // Consumer thread pool
    this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
Copy the code

Consumers’ message is eventually specify ConsumeMessageConcurrentlyService for consumption:

  • MessageListenerOrderlySequential message listeners, usedConsumeMessageOrderlyServiceObject for message consumption
  • MessageListenerConcurrentlyConcurrent message listener usageConsumeMessageConcurrentlyServiceObject to consume messages