review

Here’s a recap of the Spring Cloud version of RocketMQ’s sequential messages we demonstrated above:

Sequential messages are divided into partitioned sequential messages and global sequential messages.

Global ordering messages are a special case of partitioned ordering messages, that is, if there is only one partition and only one consumer thread is consuming at a time, they are considered global ordering messages.

When RocketMQ created a topic, the default number of queues (partitions) was 8, which is for all topics

If you want to set the number of queues (partitions) for a topic separately, you can configure it in Spring Cloud Alibaba like this:

spring:
 
  application:
    name: mq-example
  cloud:
    stream:
      bindings:
        # define a binding consumption whose name is input
        input:
          content-type: application/json
          destination: test-topic3
          group: consumer-group
        Binding production with name = output
        output-order:
          content-type: application/json
          destination: test-topic3
          The # Producer configuration item corresponds to the ProducerProperties class
          producer:
            partitionCount: 1  Number of partitions

Copy the code

Note the partitionCount, where if set to 1, the broker will send messages to the same partition.

The principle of

This article focuses on the implementation of sequential messages in RocketMQ.

In the MQ model, the order needs to be guaranteed by three phases:

  • Messages are sent in order
  • Messages are stored in the same order as they were sent
  • Messages are consumed in the same order as they are stored

The core of RocketMQ is that Producer sends sequential messages synchronously, ensuring that a set of sequential messages are sent to the same partitioned queue, and that the Consumer ensures that the same queue is consumed by only one thread

Send order

Here’s a code to see how producer sends messages sequentially:

private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        long invokeID = this.random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];

            while(true) {
                label122: {
                    String info;
                    if (times < timesTotal) {
                        info = null == mq ? null : mq.getBrokerName();
                        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                        if(mqSelected ! =null) {
                            mq = mqSelected;
                            brokersSent[times] = mqSelected.getBrokerName();

                            long endTimestamp;
                            try {
                                beginTimestampPrev = System.currentTimeMillis();
                                long costTime = beginTimestampPrev - beginTimestampFirst;
                                if (timeout >= costTime) {
                                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                                    endTimestamp = System.currentTimeMillis();
                                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                    switch(communicationMode) {
                                    case ASYNC:
                                        return null;
                                    case ONEWAY:
                                        return null;
                                    case SYNC:
                                        if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            return sendResult;
                                        }
                                    default:
                                        break label122;
                                    }
                                }

                                callTimeout = true;
                            } catch (RemotingException var26) {
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
                                this.log.warn(msg.toString());
                                exception = var26;
                                break label122;
                            } catch (MQClientException var27) {
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
                                this.log.warn(msg.toString());
                                exception = var27;
                                break label122;
                            } catch (MQBrokerException var28) {
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var28);
                                this.log.warn(msg.toString());
                                exception = var28;
                                switch(var28.getResponseCode()) {
                                case 1:
                                case 14:
                                case 16:
                                case 17:
                                case 204:
                                case 205:
                                    break label122;
                                default:
                                    if(sendResult ! =null) {
                                        return sendResult;
                                    }

                                    throwvar28; }}catch (InterruptedException var29) {
                                endTimestamp = System.currentTimeMillis();
                                this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var29);
                                this.log.warn(msg.toString());
                                this.log.warn("sendKernelImpl exception", var29);
                                this.log.warn(msg.toString());
                                throwvar29; }}}if(sendResult ! =null) {
                        return sendResult;
                    }

                    info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
                    info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                    MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                    if (callTimeout) {
                        throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                    }

                    if (exception instanceof MQBrokerException) {
                        mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
                    } else if (exception instanceof RemotingConnectException) {
                        mqClientException.setResponseCode(10001);
                    } else if (exception instanceof RemotingTimeoutException) {
                        mqClientException.setResponseCode(10002);
                    } else if (exception instanceof MQClientException) {
                        mqClientException.setResponseCode(10003);
                    }

                    throwmqClientException; } ++times; }}else {
            List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
            if (null! = nsList && ! nsList.isEmpty()) {throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
            } else {
                throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004); }}}Copy the code

The above is the message sending code, and the main process is summarized below:

When a message is sent, TopicPublishInfo is pulled from the Broker based on the Topic, which contains all the MessageQueue within the Topic.

 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
Copy the code
 private TopicPublishInfo tryToFindTopicPublishInfo(String topic) {
        TopicPublishInfo topicPublishInfo = (TopicPublishInfo)this.topicPublishInfoTable.get(topic);
        if (null== topicPublishInfo || ! topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = (TopicPublishInfo)this.topicPublishInfoTable.get(topic);
        }

        if(! topicPublishInfo.isHaveTopicRouterInfo() && ! topicPublishInfo.ok()) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true.this.defaultMQProducer);
            topicPublishInfo = (TopicPublishInfo)this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        } else {
            returntopicPublishInfo; }}Copy the code
public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;

    public TopicPublishInfo(a) {}...Copy the code

Select a destination queue:

 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
Copy the code

The core send method is then called to send the message to the broker

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
Copy the code

Synchronous sending must be used, as neither asynchronous nor one-way sending guarantees that messages will be written to the queue in an orderly manner

The sendKernelImpl method has the same/asynchronous judgment, which should be case SYNC

case ASYNC:
    Message tmpMessage = msg;
    if (msgBodyCompressed) {
        tmpMessage = MessageAccessor.cloneMessage(msg);
        msg.setBody(prevBody);
    }

    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
    if (timeout < costTimeAsync) {
        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
    }

    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
    break;
case ONEWAY:
case SYNC:
    long costTimeSync = System.currentTimeMillis() - beginStartTime;
    if (timeout < costTimeSync) {
        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
    }

    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this);
    break;
default:
    assert false;
Copy the code

Producer ensures that messages with the same ShardingKey are sent to the same queue. Taking the implementation of Spring Cloud Stream as an example, Check the determinePartition method in the PartitionHandler class

 public int determinePartition(Message
        message) {
        Object key = this.extractKey(message);
        int partition;
        if (this.producerProperties.getPartitionSelectorExpression() ! =null) {
            partition = (Integer)this.producerProperties.getPartitionSelectorExpression().getValue(this.evaluationContext, key, Integer.class);
        } else {
            partition = this.partitionSelectorStrategy.selectPartition(key, this.partitionCount);
        }

        return Math.abs(partition % this.partitionCount);
    }

Copy the code

Partition key (); partition key ();

 producer:
   partition-key-expression: payload['id']
Copy the code

The value is the value of the expression. If no value is configured, the default policy is used. The implementation of the default policy takes the hash of the message:

private static class DefaultPartitionSelector implements PartitionSelectorStrategy {
        private DefaultPartitionSelector(a) {}public int selectPartition(Object key, int partitionCount) {
            int hashCode = key.hashCode();
            if (hashCode == -2147483648) {
                hashCode = 0;
            }

            returnMath.abs(hashCode); }}Copy the code

Finally, the selection of queue is the result of modulo partition and queue (partition) total number. This ensures that messages with the same ShardingKey are sent to the same queue.

The overall process is shown as follows:

Once a message is sent, it is saved to the broker side in order due to the FIFO nature of the queue itself.

Consumption in order

By default, multiple threads concurrently consume the same MessageQueue, which is not guaranteed even if the messages arrive sequentially.

So how does RocketMQ guarantee sequential message consumption?

Just like producer, we string the code along the same lines as consumer

When consumer starts, a rebalance is performed in the start method of the MQClientInstance class:

public void start(a) throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null= =this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK".this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.".null);
                default:
                    break; }}}Copy the code

This line is rebalanceService. Start (), whose purpose is to assign itself a MessageQueue. To ensure that a queue is consumed by a consumer, the consumer must request a queue lock from the MQ server when it takes time to pull and cancel messages. If the request is received, the message is pulled. Otherwise, the message is abandoned and the next queue load cycle (20s) is tried again.

Apply for lock can reference RebalanceImpl updateProcessQueueTableInRebalance and lock code in the method:

 // If the message is sequential, request a lock queue from the Broker.
for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }

        this.removeDirtyOffset(mq);
        ProcessQueue pq = new ProcessQueue();
Copy the code
public boolean lock(final MessageQueue mq) {
    // Find the Broker Master host address
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
    if(findBrokerResult ! =null) {
        // Build the request body
        LockBatchRequestBody requestBody = new LockBatchRequestBody();
        requestBody.setConsumerGroup(this.consumerGroup);/ / consumer groups
        requestBody.setClientId(this.mQClientFactory.getClientId());// Client instance ID
        requestBody.getMqSet().add(mq);// Apply to lock which queues

        try {
            // Send a request, and the Broker returns a set of locked queues
            Set<MessageQueue> lockedMq =
                this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
            for (MessageQueue mmqq : lockedMq) {
                ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                if(processQueue ! =null) {
                    processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); }}// If the target queue is inside, the lock is successful
            boolean lockOK = lockedMq.contains(mq);
            log.info("the message queue lock {}, {} {}",
                     lockOK ? "OK" : "Failed".this.consumerGroup,
                     mq);
            return lockOK;
        } catch (Exception e) {
            log.error("lockBatchMQ exception, "+ mq, e); }}return false;
}
Copy the code

This lock is a global lock maintained by the Broker.

Once the lock is successfully added, the PullRequest object will be built to start pulling messages. The PullMessageService code for pulling messages will be implemented in the PullMessageService. After the PullMessageService is successfully pulled, the PullCallback will populate the pulled messages to ProcessQueue. Then submit a consumer request, let the ConsumeMessageOrderlyService began to consume news.

When consuming messages, the lock object of MessageQueue is obtained first, and then the synchronized keyword is used to ensure that only one thread consumes messages

For sequential messages, when a consumer fails to consume a message, message queue RocketMQ automatically and continuously retries the message at an interval of one second. The maximum retries are integer.max_value

case SUSPEND_CURRENT_QUEUE_A_MOMENT:
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
    // Maximum number of checksum retries. Default Integer.MAX_VALUE
    if (checkReconsumeTimes(msgs)) {
        // mark the message for re-consumption
        consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
        // Submit the consumption request and try again later
        this.submitConsumeRequestLater(
            consumeRequest.getProcessQueue(),
            consumeRequest.getMessageQueue(),
            context.getSuspendCurrentQueueTimeMillis());
        continueConsume = false;
    } else {
        commitOffset = consumeRequest.getProcessQueue().commit();
    }
    break;

Copy the code

Finally, in the process of consumption, the ProccessQueue will be locked to ensure that the message consumption in the process is completed, and other consumers can continue to consume after the queue load occurs.

For example, queue Q3 is currently allocated to consumer C2 for consumption, 32 messages have been pulled and processed in the thread pool, and then consumers have been expanded. The Q3 queue allocated to C2 has been allocated to C3, because C2 has been partially processed, the site information has not been submitted. If C3 immediately consumes messages in the Q3 queue, some of the data will be re-consumed. Therefore, when C2 consumers consume messages in the Q3 queue, the load queue will not be able to discard the queue, and the transaction will not be released on the broker side, and other consumers will not be able to consume from the queue. Repeated consumption of messages and sequential semantics are guaranteed to the maximum extent possible

Consumption Summary:

  1. When the message pull task is created, the message client applies to the broker to lock the MessageQueue, so that a MessageQueue can only be consumed by one consuming client at a time
  2. In message consumption, multiple threads first try to apply for an exclusive lock with synchronized for the consumption of the same MessageQueue, and the consumption can only be carried out after the lock is successfully added, so that a MessageQueue can only be consumed by one thread in one consuming client at a time
  3. In RocketMQ, each consumer group has a separate thread pool for concurrent consumption of pulled messages, i.e., the consumer side is multithreaded consumption. The concurrency of sequential consumption is equal to the number of queues to which the consumer is assigned. Consumption parallelism is theoretically not too much of a problem because the number of MessageQueue can be adjusted.
  4. In the process of consuming, the ProccessQueue is locked to ensure that the message in process is consumed
  5. If a sequential message fails to be consumed, the default is always retry, not skip, because once skipped, the semantics of the sequential message are lost

Possible problems with sequential messages

Message blocking

Sequential semantics require that if a message is not successfully consumed, the next message cannot be consumed, otherwise it is not sequential. If a message fails, skipping over to consume another message violates the semantics of sequential consumption.

When using sequential messages, ensure that applications can monitor and handle consumption failures in a timely manner to avoid blocking. You can provide policies that allow the user to decide whether to skip based on the type of error, and features like retry queues, after which the user can re-consume the message “elsewhere.”

Failover failure

Sequential messages cannot take advantage of the Failover feature of the cluster because MessageQueue cannot be replaced and retry

reference

  • Mp.weixin.qq.com/s/Ff3FTnkQm…
  • Spring. IO/blog / 2021/0…
  • www.zhihu.com/question/30…