preface

Apache RocketMQ, a well-known open source messaging middleware, was born at Alibaba and donated to Apache in 2016. From RocketMQ 4.0 to the latest version v4.7.1, it has won wide attention and praise both within alibaba and the external community.

This article analyzes how RocketMQ works in transactional message sending by reading RocketMQ Producer source code from the perspective of the sender.

It should be noted that the code posted in this article is from the RocketMQ source code version 4.7.1. When I talk about sending in this article, I only refer to the process of sending messages from the Producer to the Broker, and I do not include the process of the Broker delivering messages to the Consumer.

Macro overview

RocketMQ transaction message sending process:

Considering the source code, the sendMessageInTransaction method of RocketMQ TransactionMQProducer, The sendMessageInTransaction method of DefaultMQProducerImpl is actually called. We enter the sendMessageInTransaction method, and the entire process of sending transaction messages is clearly visible.

First, do a pre-send check and fill in the necessary parameters, including setting the PREPARE transaction message.

Source code Listing -1

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() ! = 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

Enter the sending process:

Source code Listing -2

    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

Based on the processing results returned by the broker, the local transaction execution is decided. If the semi-message is sent successfully, the local transaction execution starts:

Listing 3 source code

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() ! = null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null ! = transactionId && !" ".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null ! = localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener ! = null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState ! = LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: / / when the state for the broker is unavailable, half news to roll back, does not perform local transaction localTransactionState = localTransactionState. ROLLBACK_MESSAGE; break; default: break; }

Local transaction execution is completed, and two-phase processing is performed according to the state of the local transaction:

Listing 4 source code

try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // assembly send results //... return transactionSendResult; }

Next, we dive into the code analysis at each stage.

Dig deep inside

Phase I transmission

Focus on the send method. After entering the send method, we find that RocketMQ uses SYNC mode for one phase of the transaction message:

Listing 5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

This is easy to understand, after all, the transaction message is sent in one phase to determine whether to execute the local transaction, so you must block until the BROKER ack.

We enter DefaultMQProducerImpl. To see sendDefaultImpl method in the Java implementation, by reading the code in this method, try to understand the transaction message sending producer behavior in the process of a stage.

It is important to note that this method is not customized for transaction messages or even SYNC synchronization mode, so reading this code should give you a comprehensive understanding of the message sending mechanism of RocketMQ.

This code logic is very smooth, not to slice. In order to save space, the more complex but less informative parts of the code are replaced with comments to preserve the integrity of the process as much as possible. Personally think more important or easy to ignore the part, to note, after the text there are some details of the detailed interpretation.

Listing 6 source code

private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // Check the validity of the message. CheckMessage (MSG, this.defaultmqProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // Obtain routing information for the current topic. If not found from namesrv TopicPublishInfo TopicPublishInfo = this. TryToFindTopicPublishInfo (MSG) getTopic ()); if (topicPublishInfo ! = null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 2. Send retry mechanism. Int timesTotal = communicationMode == communicationMode.SYNC? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; Times++) {// first send mq == null, then String lastBrokerName = null == mq? null : mq.getBrokerName(); // 3. How to select queues when RocketMQ sends messages? - broker exception hedging mechanism MessageQueue mqSelected = this. SelectOneMessageQueue (topicPublishInfo lastBrokerName); if (mqSelected ! = null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; SendKernelImpl = this.sendKernelImpl(MSG, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // The avoidance mechanism for rocketMQ to select brokers, This.updatefaultitem (mq.getBrokerName(), endTimestamp -BeginTimeStampPrev, false); Switch (communicationMode) {// Case ASYNC: // ASYNC mode return null; Case ONEWAY: // one-way mode return null; If (sendresult.getsendStatus ()! = SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // ... // Automatic retry} catch (MQClientException e) {//... } catch (MQBrokerException e) {//... // If return code ==NOT_IN_CURRENT_UNIT==205, retry automatically // If other cases do not retry, throw an exception} catch (InterruptedException e) {//... // Do not retry, throw exception}} else {break; } } if (sendResult ! = null) { return sendResult; } // Assemble the returned info and throw it with MQClientException //... . / / scene behind RemotingTooMuchRequestException if overtime (callTimeout) {throw new RemotingTooMuchRequestException (" sendDefaultImpl call timeout"); } // Fill in the MQClientException information //... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }

1. Message validity verification

Listing 7

 Validators.checkMessage(msg, this.defaultMQProducer);

The validity of the message is verified in this method, both for the topic and for the message body. Topic names must conform to the specification and avoid using the built-in system message topic. Message body length > 0 && Message body length <= 102410244 = 4M.

Listing 8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }}

Second, send retry mechanism

When a message fails to be sent, it automatically tries again. The maximum number of sending times = retryTimesWhenSendFailed + 1 = 3.

It is important to note that not all exceptions are retried, and the information that can be extracted from the above source code tells us that in three cases it will be automatically retried:

  • When either RemotingException or MQClientException occurs
  • When an MQBrokerException exception occurs and the ResponseCode is NOT_IN_CURRENT_UNIT = 205
  • In SYNC mode, no exception occurs and the result status is not SEND_OK

Before sending a message each time, the system checks whether the previous two steps take too long (the default timeout period is 3000ms). If yes, the system stops sending the message and returns a timeout. Two things are illustrated here:

  • Automatic retries in producer are not aware of service applications. The sending time seen by applications includes all retries.
  • Once the timeout means that the message has failed to send because of timeout. This information will end up in the form of RemotingTooMuchRequestException thrown.

It should be noted here that the official RocketMQ documentation states that the send timeout period is 10s, which is 10000ms. Many people on the Internet interpret the RocketMQ timeout period to be 10s. However, 3000ms was clearly written in the code. Finally, AFTER DEBUGGING, I confirmed that the default timeout is indeed 3000ms.

Exception avoidance mechanisms for brokers

Listing 9

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  

This line of code selects the queue before sending.

This involves a core mechanism for sending RocketMQ messages to high availability, latencyFaultTolerance. This mechanism is part of the load balancing of Producer and is controlled by the value of sendLatencyFaultEnable. The default value is false and the broker fault delay is not enabled. If the value is true, the broker fault delay is enabled. It can be opened by the Producer.

When selecting a queue, the exception avoidance mechanism is enabled to avoid selecting the brokers in poor state based on the working status of the brokers. Unhealthy brokers will be avoided for a period of time. When the exception avoidance mechanism is not enabled, the next queue is selected in order. In a retry scenario, however, try to select a queue different from the one sent to the broker last time. The updateFaultItem method is used to maintain the status of the broker each time a message is sent.

Listing 10 source code

public void updateFaultItem(final String brokerName, final long currentLatency, Boolean isolation) {if (this. SendLatencyFaultEnable) {/ / calculate how long delay, the isolation is the quarantine needed when said the broker, if yes, then from the 30 s to find the first delay value is smaller than 30 s, Then judge the avoidance period according to the subscript. If 30s, it is 10min avoidance. // If not, press the last send time to determine the evasive duration; long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}

Take a look inside the selectOneMessageQueue method:

Listing -11 source code

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, Final String lastBrokerName) {if (this. SendLatencyFaultEnable) {/ / open exception to circumvent the try {int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; / / in order to remove a message queue as sending queue MessageQueue mq. = tpInfo getMessageQueueList () get (pos); // The current queue is available on the same broker as the previous queue, // or the first time it is sent, Use the queue if (latencyFaultTolerance isAvailable (mq) getBrokerName ())) {if (null = = lastBrokerName | | mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } / / no open exception to evade, the choice of the Queue to random since the increasing return tpInfo. SelectOneMessageQueue (lastBrokerName); }

Four, RocketMQ three CommunicationMode

Source listing -12

 public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

All three patterns refer to the stage at which a message reaches the broker from the sender, and do not involve the broker delivering the message to the subscriber. Differences between the three modes of sending:

One-way mode: ONEWAY. The message sender just sends the message and does not care what happens to the broker. In this mode, the transmission time is very small and the throughput is high due to the few processing processes, but the message reliability cannot be guaranteed. This mode is usually used in the scenarios where the traffic is heavy but the message is not important, such as heartbeat transmission.

Asynchronous mode: ASYNC. After the sender sends the message to the broker, it does not have to wait for the broker to process the message. The return value is null, and an asynchronous thread processes the message. When the message is finished, it tells the sender to send the result in the form of callback. If an exception occurs during asynchronous processing, an internal retry is performed before the failure result is returned to the sender (three times by default, and the sender is unaware of the failure). In this mode, the waiting time of the sender is small, the throughput is high, and the message is reliable. This mode is used for the message scenarios with heavy traffic but important.

Synchronization mode: SYNC. The sender needs to wait for the broker to complete processing and return a clear success or failure. Before the sender gets the result of the failure of sending the message, it will also experience an internal retry (3 times by default, without the sender being aware of it). In this mode, the sender will block and wait for the result of the message processing, which takes a long time and the message is reliable. For low-traffic but important message scenarios. It is important to emphasize that the one-phase semi-transactional message processing is synchronous mode.

You can also see specific implementation differences in the sendKernelImpl method. The ONEWAY mode is the simplest, with no processing. In the sendMessage method parameter responsible for sending, compared to synchronous mode, In asynchronous mode, there are many callback methods, topicPublishInfo containing the information of the routing element sent by topic, instance containing the information of the sending broker, producer containing the information of the sending queue, and retry times. In asynchronous mode, messages that are compressed are copied first.

Listing 13 source code

switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (! messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }

There is a diagram in the official document, which clearly describes the detailed process of asynchronous communication:

Phase ⅱ transmission

Listing 3 shows the execution of the local transaction, and localTransactionState associates the result of the local transaction execution with the sending of phase two of the transaction message.

Note that if the result of a phase is a SLAVENOTAVAILABLE, even when the broker is unavailable, localTransactionState will be set to Rollback and no local transaction will be executed. The endTransaction method then takes care of the two-phase commit, as shown in Listing 4. To implement endTransaction:

Listing -14 source code

public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() ! = null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());  EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException ! = null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; / / this. Send two ways of using oneway stage news mQClientFactory. GetMQClientAPIImpl () endTransactionOneway (brokerAddr requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }

The oneway method is used when sending in two phases because transaction messages have a special and reliable mechanism called backcheck.

Message back to check

When the Broker passes a certain period of time and finds that it still does not have the exact information on whether the phase of the transaction should be committed or rolled back, the Broker does not know what has happened to the Producer (perhaps the Producer has hung up, It is also possible that the producer sends a COMMIT but loses network jitter. It is also possible that… Then initiate to check back voluntarily.

The callback mechanism for transaction messages is more reflected on the broker side. The RocketMQ broker isolates transaction messages at different stages of delivery under three different topics: Half messages, Op messages, and real messages, so that consumers can only see the messages that finally confirm the commit to be delivered. The detailed implementation logic will not be covered in this article, and will be interpreted from the Broker perspective in a separate article.

Back to the Producer’s point of view, when the Broker requests a backcheck, the Producer checks the local transaction status based on the message and decides to commit or roll back. This requires the Producer to specify a backcheck implementation in case of emergency. Of course, it is not recommended to proactively send unknown states, which will no doubt impose additional backcheck overhead on the broker, and it is a reasonable option to only enable the backcheck mechanism in the event of an unexpected exception.

In addition, the 4.7.1 version of the transaction backcheck is not unlimited, but a maximum of 15 backchecks:

Listing 15 source code

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

The appendix

The official default parameters of Producer are as follows (the timeout duration of Producer is also mentioned in the previous article. The debug result is 3000ms by default, not 10000ms) :


Click “Learn More” to learn more about mPaaS.