We know that for Kafka to ensure sequential consumption, messages must be stored on the same patition, and for order, only one consumer can consume. In this case, Kafka degrades into a single queue, with no concurrency at all, greatly reducing system performance. So what about RocketMQ, which is more business-friendly? First, let’s take a step-by-step look at the implementation of sequential messages.

Sequential message service usage scenarios

1. Transfer the order status in the mall scene.

SQL > synchronize mysql binlong logs. SQL > synchronize mysql binlong logs.

3, other messages have a sequential dependency relationship, the latter message needs to depend on the previous message processing results.

Wait…

Sequential messages in message oriented middleware

A sequential message (FIFO message) is a type of message provided by MQ that is published and consumed in strict order. A sequential message consists of two parts: sequential publication and sequential consumption.

There are two types of sequential messages:

Partition order: All messages in a Partition are published and consumed in a first-in, first-out (FIFO) order

Global order: All messages within a Topic are published and consumed in a first-in, first-out order. However, the global order greatly reduces the throughput of the system, which is not in line with the design intention of MQ.

So the middle ground is to choose the partition order.

[Local sequential consumption]

How to ensure order

In the MQ model, the order needs to be ensured by three phases:

  1. Messages are sent in order
  2. Messages are stored in the same order they were sent
  3. Messages are consumed in the same order as they are stored

Keep order when sending means that for messages that require order, the user should send them synchronously in the same thread. Keeping the storage order consistent with that of sending requires that messages A and B are sent from the same thread, and that A must be stored before B in space. Keeping consumption consistent with storage requires that messages A and B be processed in the order that they arrive at the Consumer.

The first point is that messages are sent sequentially. The order of messages sent by multiple threads cannot be guaranteed. Therefore, when the business party needs to send messages with the same business number (like an order), it needs to ensure that the messages are sent sequentially within a thread. Corresponding to MQ, the message sending method has to use synchronous send, asynchronous send cannot guarantee sequentiality.

The second point is the sequential storage of messages. There will be multiple queues under the MQ topic. To ensure the sequential storage of messages, messages with the same service number need to be sent to a queue. In MQ, MessageQueueSelector is used to select the queue to be sent, that is, the service number is hash, the remainder of the hash value is calculated according to the number of queues, and the messages are sent to a queue.

Third, sequential consumption of messages. To ensure sequential consumption of messages, a queue can only be consumed by one consumer, so locking the consumption queue in the broker is inevitable. Only one consumer can consume a queue at a time, and only one consumer thread can consume the queue within the consumer. That is, a consumption queue can only be consumed by one thread of a consumer at a time.

Implementation of sequence in RocketMQ

The Producer side 】

The only thing the Producer side needs to do to ensure the order of messages is to route the messages to specific partitions. In RocketMQ, partition selection is realized through MessageQueueSelector.

Public interface MessageQueueSelector {/** * Select message queue ** @param MQS message queue ** @param MSG message * @param arg Parameter * @return Message queue */ MessageQueue Select (final List<MessageQueue> MQS, final Message MSG, final Object ARG); }
  • List

    MQS: all partitions under the Topic to be sent
  • Message MSG: Message object
  • Additional parameters: Users can pass their own parameters

For example, the following implementation can ensure that messages for the same order are routed to the same partition:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

“The Consumer end”

Attempt to lock the MessageQueue.

How do we first ensure that a queue is consumed by only one consumer?

Consumption queues exist on the broker side. If a queue is to be consumed by a consumer, then the consumer must apply for a queue lock from the MQ server to cancel the pull. The code for the consumer to apply for a queue lock is in the implementation code of the RebalanceService message queue load.

Consumers to load and distribution after the consumer queue, you need to pull request mq server launch news, code implementation in RebalanceImpl# updateProcessQueueTableInRebalance, in view of the news pull order news, mq made as follows:

// Add a message queue that does not exist in processQueueTable && in mqSet. List<PullRequest> pullRequestList = new ArrayList<>(); For (MessageQueue mq: mqSet) {if (! this.processQueueTable.containsKey(mq)) { if (isOrder && ! Log. Warn ("doRebalance, {}, add a new MQ failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre ! = null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); This.dispatchpullrequest (pullRequestList);

The pullRequest is created only when the lock is successfully applied to the messageQueue.

/** * request the Broker to obtain a distributed lock for the specified MessageQueue ** @param mq queue * @return successful */ public Boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult ! = null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); Try {// request the Broker to obtain a distributed lock Set<MessageQueue> lockedMq = for the specified MessageQueue this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); // Succeeded in locking the message processing queue. If the message queue is locked successfully, there may be no message processing queue locally. If the message queue is locked successfully, it will be set in the lockAll() method. for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue ! = null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false; }

The code logic is clear. The lockBatchMQ method is called to send a lock request. What is the logic of the broker after receiving the lock request?

[Broker implementation]

RebalanceLockManager#tryLockBatch method. The key properties of RebalanceLockManager are as follows:

/** * Message queue lock expiration time, Default 60s */ private final static long REBALANCE_LOCK_MAX_LIVE_TIME = long.parselong (system.getProperty ( "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); Private final Lock = new ReentrantLock(); private final Lock = new ReentrantLock(); Private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = new ConcurrentHashMap<>(1024);

The key properties of the LockEntry object are as follows:

Static class LockEntry {/** * clientId */ private String clientId; /** * Last lock time */ private volatile long lastUpdateTimestamp = System.currentTimemillis (); public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public long getLastUpdateTimestamp() { return lastUpdateTimestamp; } public void setLastUpdateTimestamp(long lastUpdateTimestamp) { this.lastUpdateTimestamp = lastUpdateTimestamp; } /** * Whether to lock ** @param clientId Client number * @return whether */ public Boolean isLocked(final String clientId) {Boolean eq = this.clientId.equals(clientId); return eq && ! this.isExpired(); } public Boolean isExpired() {Boolean expired = (system.currentTimemillis () - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; return expired; }}

ConcurrentMap<String/* group */ ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable maintenance to achieve the purpose of messageQueue lock, so that at a time, a messageQueue can only be consumed by one consumer.

[Back to the Consumer, after getting the lock]

The pullRequest is created to pull the message. The code of the pullRequest is implemented in the PullMessageService. After the message is pulled, Need to submit to ConsumeMessageService of consumption, consumption the implementation of the order as ConsumeMessageOrderlyService, Commit message consumption method for ConsumeMessageOrderlyService# submitConsumeRequest, specific implementation is as follows:

@Override public void submitConsumeRequest(// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); }}

We built a ConsumeRequest object and gave it to a ThreadPoolExecutor to consume in parallel. Take a look at the implementation of ConsumeRequest’s run method for sequential consumption:

 public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            // 获得 Consumer 消息队列锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                // (广播模式) 或者 (集群模式 && Broker消息队列锁有效)
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    // 循环
                    for (boolean continueConsume = true; continueConsume; ) {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }

                        // 消息队列分布式锁未锁定,提交延迟获得锁并消费请求
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 消息队列分布式锁已经过期,提交延迟获得锁并消费请求
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        // 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。
                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }

                        // 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。
                        final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            // Hook:before
                            ConsumeMessageContext consumeMessageContext = null;
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext = new ConsumeMessageContext();
                                consumeMessageContext
                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                consumeMessageContext.setMq(messageQueue);
                                consumeMessageContext.setMsgList(msgs);
                                consumeMessageContext.setSuccess(false);
                                // init the consume context type
                                consumeMessageContext.setProps(new HashMap<String, String>());
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                            }

                            // 执行消费
                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {
                                this.processQueue.getLockConsume().lock(); // 锁定队列消费锁

                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
                                    RemotingHelper.exceptionSimpleDesc(e), //
                                    ConsumeMessageOrderlyService.this.consumerGroup, //
                                    msgs, //
                                    messageQueue);
                                hasException = true;
                            } finally {
                                this.processQueue.getLockConsume().unlock(); // 锁定队列消费锁
                            }

                            if (null == status //
                                || ConsumeOrderlyStatus.ROLLBACK == status//
                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
                                    ConsumeMessageOrderlyService.this.consumerGroup, //
                                    msgs, //
                                    messageQueue);
                            }

                            // 解析消费结果状态
                            long consumeRT = System.currentTimeMillis() - beginTimestamp;
                            if (null == status) {
                                if (hasException) {
                                    returnType = ConsumeReturnType.EXCEPTION;
                                } else {
                                    returnType = ConsumeReturnType.RETURNNULL;
                                }
                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                                returnType = ConsumeReturnType.TIME_OUT;
                            } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                returnType = ConsumeReturnType.FAILED;
                            } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                                returnType = ConsumeReturnType.SUCCESS;
                            }

                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                            }

                            if (null == status) {
                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                            }

                            // Hook:after
                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                consumeMessageContext.setStatus(status.toString());
                                consumeMessageContext
                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                            }

                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                            // 处理消费结果
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

After acquiring the lock object, use synchronized to attempt to apply for a thread-level exclusive lock.

If the lock is successful, only one thread is consuming messages at a time.

If the locking fails, a 100ms delay will be required to re-apply to the broker for locking the messageQueue, and re-submit the consumption request after the locking succeeds

So far, the solution to the third key point is clear, basically two steps.

When a message pull task is created, the message client asks the broker to lock the MessageQueue, so that a MessageQueue can only be consumed by one consuming client at a time.

When a message is consumed, multiple threads first attempt to use synchronized to apply for an exclusive lock for the consumption of the same MessageQueue, and the consumption can only take place after the lock is successfully added, so that a MessageQueue can only be consumed by one thread in one consuming client at a time.

[Disassembly of sequential consumption problems]

  1. It is important to ensure that only one process is consuming per queue on Broke, i.e. only one consumer is consuming per queue at a time
  2. The order of messages from the broker to the consumer should be consistent, which is easy to implement because it is transmitted by RPC and the sequence of messages remains the same after serialization
  3. Queue messages on a consumer must be consumed by only one thread at a time

To solve this problem, the common approach is to lock the access to the resource. That is, a queue message on the broker must be locked when it is accessed by a consumer, and a single consumer must be locked when multiple threads process the message concurrently. Consider the exception of broker locks. If a message on the Broke queue is locked by a consumer, the lock cannot be released if the Consumer crashes, so the lock on the broker needs to expire. In fact, the consumer side of RocketMQ will do the following:

Sequential message considerations in RocketMQ

  1. Sequential messages are not always required in real projects, but they are often overlooked when designing solutions
  2. The sequential message is the result of cooperation and coordination between producer and consumer, but the consumer can not guarantee the sequential message by guaranteeing the sequential consumption
  3. The consumer consumes in parallel mode, and only sets the number of pull messages to 1 (that is, the configuration parameter consumeBatchSize). Can sequential consumption be achieved? Here is actually not, concurrent consumption in the consumption side has multiple threads at the same time, consumeBatchSize is just a thread a pull information quantity, to order consumption has no meaning, here we are interested in can see ConsumeMessageConcurrentlyService code, Where is the logic for concurrent consumption.

When using sequential messages, be aware of the exceptions. For sequential messages, when a consumer fails to consume the message, RocketMQ version of the message queue automatically retries the message repeatedly (at an interval of 1 second), with a maximum value of Integer.MAX\_VALUE. At this point, the application may block message consumption. Therefore, when using sequential messages, you are advised to ensure that the application can monitor and handle consumption failures in a timely manner to avoid blocking.

Again, it’s important to be aware of exceptions when using sequential messages! Prevent resources from not being released!

summary

Through the above understanding, we know the necessary conditions for the implementation of sequential messages: sequential sending, sequential storage, sequential consumption. RocketMQ was designed with this in mind. We simply use the API and do not need to use additional code to constrain the business, making it much easier to implement sequential messages.