preface

Apache RocketMQ, widely known as open source messaging middleware, was born in Alibaba and donated to Apache in 2016. From RocketMQ 4.0 to the latest version v4.7.1, both within alibaba and outside the community, has won widespread attention and praise.

This article will analyze how RocketMQ works in transactional message sending by reading the RocketMQ Producer source code from the perspective of the sender.

Note that the code posted in this article comes from RocketMQ source 4.7.1. In this article, sending refers only to the process of sending messages from Producer to Broker, not the process of the Broker sending messages to consumers.

Macro overview

RocketMQ transaction message sending process:

RocketMQ’s TransactionMQProducer’s sendMessageInTransaction method, The sendMessageInTransaction method of DefaultMQProducerImpl is actually called. We enter the sendMessageInTransaction method and the entire transaction message sending process is clearly visible.

First, do a pre-send check and fill in the necessary parameters, including setting the prepare transaction message.

Source listing -1

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() ! = 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());Copy the code

Enter the sending process:

Source code Listing 2

    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
Copy the code

The local transaction is executed based on the result returned by the broker. If the half-message is successfully sent, the local transaction is executed:

Source code Listing -3

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() ! = null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null ! = transactionId && !" ".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null ! = localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener ! = null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState ! = LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: / / when the state for the broker is unavailable, half news to roll back, does not perform local transaction localTransactionState = localTransactionState. ROLLBACK_MESSAGE; break; default: break; }Copy the code

After the local transaction is executed, two-stage processing is performed according to the local transaction status:

Source listing -4

try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // The assembly sends the result //... return transactionSendResult; }Copy the code

Next, let’s dive into each phase of code analysis.

Dig deep inside

Stage ⅰ Sending

Focus on analysis of SEND method. After entering the send method, we find that RocketMQ uses SYNC mode for the first phase of the transaction message:

Source listing -5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
Copy the code

This is easy to understand, since transaction messages depend on the results of a phase send to determine whether or not to perform a local transaction, so block waiting for the broker’s ACK.

We enter DefaultMQProducerImpl. To see sendDefaultImpl method in the Java implementation, by reading the code in this method, try to understand the transaction message sending producer behavior in the process of a stage.

It is important to note that this method is not customized for transaction messages, or even for SYNC synchronization mode, so reading this code gives you a pretty good idea of how RocketMQ sends messages.

This code logic is very unobstructed, can not bear to slice. In order to save space, the more complicated but less informative parts of the code are replaced by comments, so as to preserve the integrity of the process as much as possible. The part that I think is more important or easy to be ignored is marked with notes, followed by a detailed interpretation of some details.

Source code Listing -6

private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // Verify message validity. Validators. CheckMessage (MSG, this.defaultmqProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // Get the routing information for the current topic. If not found from namesrv TopicPublishInfo TopicPublishInfo = this. TryToFindTopicPublishInfo (MSG) getTopic ()); if (topicPublishInfo ! = null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // Send retry mechanism. Int timesTotal = communicationMode == communicationmode.sync? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; Times++) {// first send mq == null, then String lastBrokerName = null == mq with broker information? null : mq.getBrokerName(); How to select a queue when RocketMQ sends messages? - broker exception hedging mechanism MessageQueue mqSelected = this. SelectOneMessageQueue (topicPublishInfo lastBrokerName); if (mqSelected ! = null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(MSG, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // RocketMQ is an evasive mechanism for selecting brokers, SendLatencyFaultEnable == True this.updateFaultitem (mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); Switch (communicationMode) {// four, RocketMQ three communicationMode. Case ASYNC: // ASYNC return null; Case ONEWAY: // return null; Case SYNC: // If (sendresult.getsendStatus ()! = SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // ... } catch (MQClientException e) {//... } catch (MQBrokerException e) {//... Return code ==NOT_IN_CURRENT_UNIT==205 automatically retry. Catch (InterruptedException e) { }} else {break; } } if (sendResult ! = null) { return sendResult; } // Assemble the returned info and finally throw it with MQClientException //... . / / scene behind RemotingTooMuchRequestException if overtime (callTimeout) {throw new RemotingTooMuchRequestException (" sendDefaultImpl call timeout"); } // Populate MQClientException exception information //... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }Copy the code

1. Verify message validity

Source code Listing -7

 Validators.checkMessage(msg, this.defaultMQProducer);
Copy the code

Verify message validity in this method, including topic and message body validation. Topic names must conform to the specification and avoid using built-in system message Topics. Message body length > 0 && Message body length <= 102410244 = 4M.

Source code Listing -8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }}Copy the code

Send retry mechanism

If the message fails to be sent, the Producer automatically tries again. The maximum number of times is = retryTimesWhenSendFailed + 1 = 3.

It is important to note that not all exception cases are retried, and the information we can extract from the above source code tells us that automatic retries occur in three cases:

  • When either RemotingException or MQClientException occurs
  • When MQBrokerException occurs and ResponseCode is NOT_IN_CURRENT_UNIT = 205
  • In SYNC mode, no exception occurs and the result status is not SEND_OK

Before sending a message each time, the system checks whether the previous two steps have taken too long (the default timeout duration is 3000ms). If yes, the system does not continue sending messages and returns a timeout without retry. There are two points here:

  • Automatic retry inside producer is insensitive to service applications. The sending time seen by applications includes all the retry time.
  • A timeout means that the message has failed because of a timeout. This information will end up in the form of RemotingTooMuchRequestException thrown.

It should be noted that the official RocketMQ documentation states that the timeout is 10 seconds, or 10,000 ms, and many people on the Internet interpret RocketMQ’s timeout as 10 seconds. However, 3000ms was clearly written in the code. Finally, AFTER DEBUGGING, I confirmed that the default timeout time was indeed 3000ms.

3. Exception avoidance mechanism of the broker

Source code Listing -9

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  
Copy the code

This line of code selects queue before sending.

LatencyFaultTolerance is one of the core mechanisms for RocketMQ messaging to be highly available. This mechanism is part of the Producer load balancing and is controlled by sending LatencyFaultenable. The default value is false. It can be opened by the Producer.

When selecting a queue, enable the exception avoidance mechanism to avoid selecting an unhealthy broker based on the working status of the broker. Unhealthy brokers will be avoided for a period of time. If the exception avoidance mechanism is not enabled, the next queue will be selected in sequence. In retry scenarios, however, a queue different from the one that sent the broker last time is selected. The state of the broker is maintained through the updateFaultItem method every time a message is sent.

Source code Listing -10

public void updateFaultItem(final String brokerName, final long currentLatency, Boolean isolation) {if (this. SendLatencyFaultEnable) {/ / calculate how long delay, the isolation is the quarantine needed when said the broker, if yes, then from the 30 s to find the first delay value is smaller than 30 s, Then judge the avoidance cycle by subscript, if 30s, it is 10min avoidance; // Otherwise, the evading duration is determined by the last sending time; long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}Copy the code

Take a look inside the Selecton Message Value method:

Source code listing -11

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, Final String lastBrokerName) {if (this. SendLatencyFaultEnable) {/ / open exception to circumvent the try {int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; / / in order to remove a message queue as sending queue MessageQueue mq. = tpInfo getMessageQueueList () get (pos); // The broker of the current queue is available and the same as that of the previous queue. Use the queue if (latencyFaultTolerance isAvailable (mq) getBrokerName ())) {if (null = = lastBrokerName | | mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } / / no open exception to evade, the choice of the Queue to random since the increasing return tpInfo. SelectOneMessageQueue (lastBrokerName); }Copy the code

Four, RocketMQ three communication modes

Source code listing -12

 public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}
Copy the code

All three modes refer to the stage when a message reaches the broker from the sender, and do not involve the broker delivering the message to the subscriber. Differences in the sending modes of the three modes:

** One-way mode: **ONEWAY. The sender of the message just sends it and does not care about the outcome of the broker’s processing. In this mode, the processing flow is small, the sending time is small, and the throughput is large. However, the message reliability cannot be guaranteed. In this mode, the message is often used in scenarios with heavy traffic but unimportant messages, such as heartbeat sending.

Asynchronous mode: **ASYNC. After sending a message to the broker, the sender does not wait for the broker to return a null value. Instead, an asynchronous thread processes the message and sends a callback telling the sender to send the result. If an exception occurs during asynchronous processing, internal retries are performed (three times by default and the sender is not aware of the failure result) before the result is returned. In this mode, the waiting time of the sender is small, the throughput is large, and the message is reliable. In this mode, the message is used in heavy but important message scenarios.

** Synchronization mode: **SYNC The sender has to wait for the broker to complete and return a clear message of success or failure. Before the sender receives the message failure result, it will also undergo internal retries (3 times by default, and the sender is not aware of it). In this mode, the sender will block and wait for the message processing result, which takes a long time and is reliable. Used for small but significant message scenarios. It is important to emphasize that transaction messages are processed in a synchronous mode for one-phase semi-transaction messages.

Specific implementation differences can also be seen in the sendKernelImpl method. The ONEWAY mode is the simplest and does nothing. In the sendMessage method parameter responsible for sending, compared to synchronous mode, Asynchronous mode includes callback methods, topicPublishInfo that contains topic sending route meta information, instance that contains sending broker information, producer that contains sending queue information, and retry times. In asynchronous mode, compressed messages are copied first.

Source code Listing -13

switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (! messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }Copy the code

The official documentation has this diagram, which clearly describes the detailed process of asynchronous communication:

Stage ⅱ sending

Listing 3 illustrates the execution of a local transaction, and localTransactionState associates the execution result of a local transaction with the delivery of a transaction message in two phases.

Note that if the result of a phase send is SLAVENOTAVAILABLE, localTransactionState will be set to Rollback even if the broker is unavailable, and no local transactions will be performed. The endTransaction method then takes care of the two-stage commit, as shown in Listing 4. Specific to the implementation of endTransaction:

Source code Listing -14

public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() ! = null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());  EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException ! = null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; / / this. Send two ways of using oneway stage news mQClientFactory. GetMQClientAPIImpl () endTransactionOneway (brokerAddr requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }Copy the code

The oneway method is used for two-phase sending because, as I understand it, transaction messages have a special and reliable mechanism — lookback.

Message back to check

The Broker does not know what has happened to the Producer after a certain amount of time has passed without knowing whether the Producer should commit or roll back the second phase of the transaction message. It could be that the producer sends a COMMIT but the network jitter is lost, or it could be… So take the initiative to check back.

The mechanism of looking back for transaction messages is more embodied at the broker side. RocketMQ’s broker isolates transaction messages at the different delivery stages in three different topics: Half messages, Op messages, and real messages, so that the Consumer sees only the messages that need to be delivered when the commit is finally confirmed. The detailed implementation logic is not covered in this article, but can be explained from the perspective of the Broker in another article.

Going back to the perspective of Producer, when receiving a callback request from the Broker, the Producer checks the state of the local transaction based on the message and decides whether to commit or rollback based on the result. This requires the Producer to specify a callback implementation in case of need. Of course, in normal circumstances, it is not recommended to actively send an unknown state. This state undoubtedly incurs an additional backcheck cost on the broker. It is a reasonable choice to start backchecks only when unexpected exceptions occur.

In addition, the 4.7.1 transaction callback is not infinite, but a maximum of 15 times:

Source code listing -15

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;
Copy the code

The appendix

The official default parameter of Producer is as follows (the timeout parameter is also mentioned in the previous article. The debug result is 3000ms by default, not 10000ms) :


Click “Learn More” to learn more about mPaaS.