Start the task

Broker/SRC/main/Java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl. Java classes.

@Override
public void run(a) {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();  // The default value is 60000, i.e. 60s
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd(a) {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // Call chekc
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
Copy the code

The run() and onWaitEnd() methods are overridden using the template design pattern. Check () is called in onWaitEnd(). Where is onWaitEnd() called? See waitForRunning () method in the SRC/main/Java/org/apache/rocketmq/common/ServiceThread in Java.

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true.false)) {
        // Call the logic overridden by subclasses
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        / / wait for
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd(); }}Copy the code

The check() method is executed at a default interval of 60 seconds. Check () is a timed callback method.

Check the process

Broker/SRC/main/Java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl. Java class, first look at all the code:

@Override
    public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
        try {
            // Define topic as "RMQ_SYS_TRANS_HALF_TOPIC"
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // Get the half queue
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                / / get opQueue
                MessageQueue opQueue = getOpQueue(messageQueue);
                // Get the consumption offset of the half queue
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // Get the offset of the OP queue
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
                    continue;
                }

                List<LongOpQueue offset*/> doneOpOffset = new ArrayList<>();
                HashMap<Long/*halfQueue offset*/, Long/*opQueue offset*/> removeMap = new HashMap<>();  // Stores messages that have been processed but have not been added to doneOpOffset
                // Save processed but not updated messages to removeMap for later determination
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset);
                    continue;
                }
                int getMessageNullCount = 1;  // The number of times to get an empty message
                long newOffset = halfOffset;  // The latest progress in processing the half queue
                long i = halfOffset; // The half queue offset of the message being processed
                while (true) {
                    // Check whether the current task time slice is used up
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    If the half message has already been processed, proceed to the next message
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // Get the message from the half queue consuming offset
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            // Indicates the maximum number of retries
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue; }}// If the storage time exceeds (3 days by default) or the check time exceeds (15 times by default)
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }

                        // If the storage time is longer than the start time, no processing is performed
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
                            break;
                        }

                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();  // The time the message has been stored
                        long checkImmunityTime = transactionTimeout; // Detect the time of the transaction message immediately
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); // The latest check time to get the message
                        if (null! = checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);// If the check time is not reached, no operation is performed
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue; }}}else {
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break; }}// Get the list of messages
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // Check whether a rollback is required
                        boolean isNeedCheck = (opMsg == null&& valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg ! =null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
                        if (isNeedCheck) {
                            // The message is put back into halfQueue
                            if(! putBackHalfMsgQueue(msgExt, i)) {continue;
                            }
                            // Perform a rollback
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                if(newOffset ! = halfOffset) {// Update the halfQueue consumption progress
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if(newOpOffset ! = opOffset) {// Update the opQueue consumption progresstransactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }}}catch (Throwable e) {
            log.error("Check error", e); }}Copy the code

Too much code, break it down one by one:

Get half queue

// Define topic as "RMQ_SYS_TRANS_HALF_TOPIC"
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// Get the half queue
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    log.warn("The queue of topic is empty :" + topic);
    return;
}
Copy the code

In RocketMQ, all transaction messages are stored in two topic queues, RMQ_SYS_TRANS_HALF_TOPIC (halfQueue) and RMQ_SYS_TRANS_OP _HALF_TOPIC (opQueue).

  • RMQ_SYS_TRANS _HALF _TOPIC: Save the half message
  • RMQ_SYS_TRANS_OP _HALF _TOPIC: When half messages receive commit/ ROLLBACK, they are saved to opQueue

Define the offset

// Start time
long startTime = System.currentTimeMillis();
/ / get opQueue
MessageQueue opQueue = getOpQueue(messageQueue);
// Get the consumption offset of the half queue
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// Get the offset of the OP queue
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
Copy the code
  • StartTime: records the current time, which is required for determining the execution time of a task
  • OpQueue:RMQ_SYS_TRANS_OP_HALF_TOPICQueue, which holds processed messages.
  • HalfOffset: Consumption record position in halfQueue. Only the messages after halfOffset that can be guaranteed need to be checked back
  • OpOffset: indicates the consumption record in opQueue. The message greater than opOffset is the message added during the period since the last scheduled check task ended.

Marks processed messages

List<LongOpQueue offset*/> doneOpOffset = new ArrayList<>();
HashMap<Long/*halfQueue offset*/, Long/*opQueue offset*/> removeMap = new HashMap<>();  // Stores messages that have been processed but have not been added to doneOpOffset
// Save processed but not updated messages to removeMap for later determination
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
Copy the code
  • DoneOpOffset: Saves the offset of the processed messages in the opQueue, which will be used when updating the offset of the opQueue
  • RemoveMap: Saves the messages that have been saved to opQueue but are still in halfQueue. It is used to determine whether the messages need to be checked.

fillOpRemove()

The purpose of the removeMap method is to populate processed messages to removeMap

private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
    // step1: get op message, pull 32 message
    PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
    if (null == pullResult) {
        return null;
    }
    // step2: check the status
    if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
        log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
        transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
        return pullResult;
    } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { // No new message
        log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
        return pullResult;
    }
    List<MessageExt> opMsg = pullResult.getMsgFoundList();
    if (opMsg == null) {
        log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
        return pullResult;
    }
    // step3: Iterate over all messages
    for (MessageExt opMessageExt : opMsg) {
        // Get the opQueue offset
        Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
        log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
        if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
            if (queueOffset < miniOffset) {
                // Normally, the message offset in opQueue is smaller than that in halfQueue
                doneOpOffset.add(opMessageExt.getQueueOffset());
            } else {
                // The message is stored in opQueue, but the offset in halfQueue has not been updated
                // Save the file to removeMap to prevent repeated follow-up operationsremoveMap.put(queueOffset, opMessageExt.getQueueOffset()); }}else {
            log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
        }
    }
    log.debug("Remove map: {}", removeMap);
    log.debug("Done op list: {}", doneOpOffset);
    return pullResult;
}
Copy the code

Define variables

int getMessageNullCount = 1;  // The number of times to get an empty message
long newOffset = halfOffset;  // The latest progress in processing the half queue
long i = halfOffset; // The half queue offset of the message being processed
Copy the code
  • GetMessageNullCount: rocketMQ allows retries. This field determines the number of retries

Check whether the check conditions are met

// Check whether the current task time slice is used up
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
    break;
}
If the half message has already been processed, proceed to the next message
if (removeMap.containsKey(i)) {
    log.debug("Half offset {} has been committed/rolled back", i);
    Long removedOpOffset = removeMap.remove(i);
    doneOpOffset.add(removedOpOffset);
} else {
    // Get the message from the half queue consuming offset
    GetResult getResult = getHalfMsg(messageQueue, i);
    MessageExt msgExt = getResult.getMsg();
    if (msgExt == null) {
        // Indicates the maximum number of retries
        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
            break;
        }
        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
            log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
            break;
        } else {
            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
            i = getResult.getPullResult().getNextBeginOffset();
            newOffset = i;
            continue; }}// If the storage time exceeds (3 days by default) or the check time exceeds (15 times by default)
    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
        listener.resolveDiscardMsg(msgExt);
        newOffset = i + 1;
        i++;
        continue;
    }

    // If the storage time is longer than the start time, no processing is performed
    if (msgExt.getStoreTimestamp() >= startTime) {
        log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
        break;
    }

    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();  // The time the message has been stored
    long checkImmunityTime = transactionTimeout; // Detect the time of the transaction message immediately
    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); // The latest check time to get the message
    if (null! = checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);// If the check time is not reached, no operation is performed
        if (valueOfCurrentMinusBorn < checkImmunityTime) {
            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                newOffset = i + 1;
                i++;
                continue; }}}else {
        if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
            log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
            break; }}Copy the code

Time slice mechanism: In RocketMQ, a common design is to assign time slices to tasks and exit if the time slices run out.

Performs the check

// Get the list of messages
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// Check whether a rollback is required
boolean isNeedCheck = (opMsg == null&& valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg ! =null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
    // The message is put back into halfQueue
    if(! putBackHalfMsgQueue(msgExt, i)) {continue;
    }
    // Perform a rollback
    listener.resolveHalfMsg(msgExt);
} else {
    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
    log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
    continue;
}
Copy the code

The two most important parts are putBackHalfMsgQueue() and resolveHalfMsg().

putBackHalfMsgQueue()

Here messages are overwritten and appended to commitlog. Nice design:

  1. The interface that sends the look-back is asynchronous, and the message is overwritten and appended to the queue so that the next look-back can retrieve the message. If a callback results, the message is saved to opQueue, which is filtered out in fillOpRemove().
  2. Each time a message is queried, the attribute of the number of times to be queried is modified. The sequential write performance is higher than that of the stored message.
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
    // Store the message to the Commitlog again
    PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
    // The attributes of the message were successfully modified
    if(putMessageResult ! =null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
        // Set the new consumption queue logical offset
        msgExt.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
        // Set the offset of the new commitlog
        msgExt.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
        // Set the new msgId
        msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        log.debug("Send check message, the offset={} restored in queueOffset={} " + "commitLogOffset={} " + "newMsgId={} realMsgId={} topic={}", offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), msgExt.getTopic());
        return true;
    } else {
        log.error("PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}", msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
        return false; }}Copy the code

Update the index

if(newOffset ! = halfOffset) {// Update the halfQueue consumption progress
    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// Get the opQueue offset from the doneOpOffset list
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if(newOpOffset ! = opOffset) {// Update the opQueue consumption progress
    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
Copy the code

The calculateOpOffset() method simply returns the last value in the doneOffset list.

private long calculateOpOffset(List<Long> doneOffset, long oldOffset) {
    Collections.sort(doneOffset);
    long newOffset = oldOffset;
    for (int i = 0; i < doneOffset.size(); i++) {
        if (doneOffset.get(i) == newOffset) {
            newOffset++;
        } else {
            break; }}return newOffset;

}
Copy the code