RocketMQ source code — Series1: Producer view of transaction messages

1. Introduction

Apache RocketMQ, widely known as open source messaging middleware, was born in Alibaba and donated to Apache in 2016. From RocketMQ 4.0 to today’s latest V4.7.1, RocketMQ has won wide attention and praise from both inside and outside Alibaba community. For the sake of interest and work, I recently perused some of RocketMQ 4.7.1 code, which created a lot of confusion and even more enlightenment.

From the perspective of the sender, this paper analyzes how RocketMQ works in transactional message sending by reading the RocketMQ Producer source code. It should be noted that the code posted in this article is from the RocketMQ source code of version 4.7.1. Sending discussed in this article only refers to the process of sending messages from the Producer to the Broker, and does not include the process of the Broker sending messages to the Consumer.

2. The big picture

RocketMQ transaction message sending process:

Figure 1

In combination with the source code, RocketMQ transactionMQProducer’s SendMessageInTransaction method, The SendMessageInTransaction method of DefaultMQProducerImpl is actually called. We enter the SendMessageInTransaction method, and the whole process of sending the transaction message is clearly visible:

First, do a pre-send check and fill in the necessary parameters, including setting the Prepare transaction message.

Source Listing-1

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() ! = 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

Enter the sending process:

Source Listing 2

 try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

Local transaction execution is decided based on the results returned by the broker. If the semi-message is sent successfully, the local transaction execution begins:

Source Listing 3

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() ! = null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null ! = transactionId && !" ".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null ! = localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener ! = null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState ! = LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: / / when the state for the broker is unavailable, half news to roll back, does not perform local transaction localTransactionState = localTransactionState. ROLLBACK_MESSAGE; break; default: break; }

After the completion of the local transaction, two-stage processing is carried out according to the state of the local transaction:

Source Listing-4

try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // Assemble to send results //... return transactionSendResult; }

Next, we dive into each phase of the code analysis.

3. Dig deep

3.1 One-Phase Sending

The send method is mainly analyzed. After entering the send method, we find that RocketMQ uses the SYNC synchronization mode for the first phase of the transaction message:

Source Listing 5

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

This is easy to understand, since transactional messages are sent in one phase to determine whether a local transaction should be executed, so it is important to block the ACK waiting for the broker.

We enter DefaultMQProducerImpl. To see sendDefaultImpl method in the Java implementation, by reading the code in this method, try to understand the transaction message sending producer behavior in the process of a stage. It’s worth noting that this method is not customized for transactional messages, or even for Sync synchronization mode, so reading this code will give you a fairly complete understanding of RocketMQ’s message delivery mechanism.

This code is very logical and can’t be sliced. To save space, replace the more complex but less informative parts of the code with comments to preserve the integrity of the process as much as possible. The part that I think is more important or easy to be ignored will be marked with comments, and some details will be explained in detail later.

Source Listing-6

private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // Check the message validity. Check Message(MSG, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // Get the routing information for the current topic, mainly the broker, If not found from namesrv TopicPublishInfo TopicPublishInfo = this. TryToFindTopicPublishInfo (MSG) getTopic ()); if (topicPublishInfo ! = null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // send a retry mechanism. CommunicationMode == CommunicationMode. Sync? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; Times+ +) {// The first send is mq == null, and then the String lastBrokerName = null == mq? null : mq.getBrokerName(); // RocketMQ sends a message to the RocketMQ. - broker exception hedging mechanism MessageQueue mqSelected = this. SelectOneMessageQueue (topicPublishInfo lastBrokerName); if (mqSelected ! = null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(MSG, mq, communicationMode, sendCallback, topicPublishInfo, MSG); timeout - costTime); endTimestamp = System.currentTimeMillis(); // RocketMQ avoids brokers when selecting brokers SendLatencyFaultAble == True This. UpdateFaultItem (MQ.getBrokerName (), EndTimeStamp - BeginTimeStampPrev, false); Switch (communicationMode) {// RocketMQ's communicationMode. Case ASYNC: // ASYNC return null; Case ONEWAY: // return null; If (sendResult.getSendStatus()! = SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // ... // retry automatically} catch (MQClientException E) {//... // auto retry} catch (mqbrokerException e) {//... } catch (interruptedException e) {// return code ==NOT_IN_CURRENT_UNIT==205 // return code ==NOT_IN_CURRENT_UNIT==205 // return code ==NOT_IN_CURRENT_UNIT==205 // }} else {break; } } if (sendResult ! = null) { return sendResult; } // Assemble the returned INFO information and finally throw //... with an MQClientException. . / / scene behind RemotingTooMuchRequestException if overtime (callTimeout) {throw new RemotingTooMuchRequestException (" sendDefaultImpl call timeout"); } // populate the MQClientException information //... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }

3.1.1 Message validity verification

Source Listing-7

Validators.checkMessage(msg, this.defaultMQProducer); Message validity is verified in this method, including for topic and message body. Topics must be named in accordance with the specification and avoid using built-in system message topics. Message body length > 0 && Message body length <= 102410244 = 4M.

Source Listing-8

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }}

3.1.2 Send retry mechanism

Producer will automatically retry if the message is not sent successfully, and the maximum number of times sent = RetryTimeWhensendFailed + 1 = 3 times.

It is important to note that not all exceptions will be retried. The information we can extract from the above source code tells us that retries will be automatic in three cases: 1) RemotingException, or one of two MQClientException exceptions. 2) MQBrokerException occurs and responseCode is NOT_IN_CURRENT_UNIT = 205. 3) In SYNC mode, no exception occurs and the sending result state is not SEND_OK.

Before each message is sent, it will check whether the previous two steps have taken too long (the default timeout length is 3000ms). If so, it will not continue sending and return timeout directly without retrying. Here two problems are explained: 1) The producer’s internal automatic retry is not perceptive to the business application, and the sending time seen by the application includes the time spent on all retries; 2) Once the timeout means that this message has been sent to the end of failure, the reason is timeout. This information will end up in the form of RemotingTooMuchRequestException thrown.

It should be pointed out that the RocketMQ documentation states that the send timeout is 10s, or 10000ms, and many people on the Internet interpret RocketMQ’s timeout as 10s. However, the code clearly said 3000ms. After debugging, I finally confirmed that the default timeout was indeed 3000ms. It is also recommended that the RocketMQ team verify the documentation and correct any errors as soon as possible.

Figure 2

3.1.3 Exception avoidance mechanism of broker

Source Listing-8

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

This line of code selects Queue before sending.

This involves RocketMQ messaging’s core mechanism of high availability, Latency Faulttolerance. This mechanism is part of the load balancing of the Producer and is controlled by the value of sendLatencyFaultEnable. The default is false to close the state, which does not enable the broker fault delay mechanism. If the value is true, the broker fault delay mechanism will be enabled, which can be turned on by the Producer actively.

When selecting a queue, the exception avoidance mechanism is enabled. According to the working state of the broker, the current bad state of the broker will be avoided. If the exception avoidance mechanism is not enabled, the next queue is selected in order. However, in the retry scenario, try to select a queue that is different from the last sent broker. Each time a message is sent, the status of the broker is maintained through the updateFaultItem method.

Source Listing-9

public void updateFaultItem(final String brokerName, final long currentLatency, Boolean isolation) {if (this. SendLatencyFaultEnable) {/ / calculate how long delay, the isolation is the quarantine needed when said the broker, if yes, then from the 30 s to find the first delay value is smaller than 30 s, Then judge the cycle of avoidance by subscript, if 30s, it is 10min avoidance; // If not, the duration of evasion is determined by the last time it was sent; long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}

Dig deep into the selectoneMessageQueue method:

Source Listing-10

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, Final String lastBrokerName) {if (this. SendLatencyFaultEnable) {/ / open exception to circumvent the try {int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; / / in order to remove a message queue as sending queue MessageQueue mq. = tpInfo getMessageQueueList () get (pos); // The broker in the current queue is available and is the same as the broker in the previous queue. Use the queue if (latencyFaultTolerance isAvailable (mq) getBrokerName ())) {if (null = = lastBrokerName | | mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } / / no open exception to evade, the choice of the Queue to random since the increasing return tpInfo. SelectOneMessageQueue (lastBrokerName); }

3.1.4 RocketMQ’s three communicationModes

Source Listing-11

public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

All three patterns refer to the stage when a message arrives at the broker from the sender, and do not include the process by which the broker delivers the message to the subscriber.

The differences of the three modes of sending:

One-way mode: OneWay. The sender of the message sends the message and does not care about the outcome of the broker’s processing. In this mode, due to the small processing process, the sending time is very small and the throughput is large, but the message cannot be guaranteed to be reliable and not lost. It is often used in the message scenarios with large but insignificant traffic, such as heartbeat sending, etc.

Asynchronous mode: Async. Instead of waiting for the broker to process the message, the sender receives a null return value. Instead, an asynchronous thread handles the message and tells the sender to send the result in a callback. If there is an exception during asynchronous processing, it will be retried internally (default 3 times, sender is not aware) before sending back the failed result. In this mode, the waiting time of the sender is small, the throughput is large, and the message is reliable. It is used in the message scenarios with large but important traffic.

Sync mode: Sync. The sender of the message waits for the broker to complete and returns a clear success or failure. It also undergoes an internal retry (default 3 times, unknown to the sender) before the sender receives the result of the failed message. In this mode, the sender will block waiting for the result of message processing. The waiting time is long and the message is reliable. It is used in the scenario of small but important message traffic. It is important to emphasize that the first phase and half transactional message processing is in synchronous mode.

Specific implementation differences can also be seen in the SendKernelImpl method. The OneWay mode is the simplest and does nothing. Compared with synchronous mode, asynchronous mode has more callback methods, TopicPublishInfo containing routing meta information of topic, instance containing sending broker information, Producer containing sending queue information, and retry times. In addition, in asynchronous mode, the compressed message is copied first.

Source Listing-12

switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (! messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }

There is a diagram in the official document, which clearly describes the detailed process of asynchronous communication:

Figure 3

3.2 Two-phase transmission

Source Listing 3 represents the execution of a local transaction, and LocalTransactionState associates the result of the local transaction execution with the sending of the second phase of the transactional message.

It is worth noting that if the result of a phase one send is SLAVE_NOT_AVAILABLE, in case the broker is not available, the LocalTransactionState will also be set to ROLLBACK and no local transaction will be executed. The endTransaction method is then responsible for the two-phase commit, as shown in source listing 4. Specific to the EndTransaction implementation:

Source Listing-13

public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() ! = null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());  EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException ! = null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; / / this. Send two ways of using oneway stage news mQClientFactory. GetMQClientAPIImpl () endTransactionOneway (brokerAddr requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }

I understand that this is because transactional messages have a special and reliable mechanism — lookback.

3.3 Message Reply

When the Broker passes a certain period of time and finds that there is still no exact information about whether the second stage of the transaction message should be committed or rollback, the Broker does not know what has happened to the Producer (it may be that the Producer hung up, or it may be that the Producer issued a commit but the network jitter is lost. Or maybe… “, so take the initiative to launch a back check.

Transaction message callback mechanism, more on the broker side of the embodiment. RocketMQ Broker isolates the transaction messages in different sending stages by using three different topics: Half Message, OP Message, and Real Message, so that the Consumer can only see the message that the final commit needs to send. The detailed implementation logic is not covered in this article, but can be explained from the perspective of the Broker in a future article.

Back to the perspective of the Producer, when receiving the Broker’s callback request, the Producer will check the state of the local transaction according to the message and decide to submit or roll back according to the result. This requires the Producer to specify the callback implementation in case of undue need.

However, it is not recommended to actively send an unknown state. This state incurs additional callback costs to the broker. It is a reasonable choice to initiate the callback mechanism only when unexpected exceptions occur.

In addition, the 4.7.1 version of transaction callback is not unlimited, but up to 15 times:

Source Listing-14

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

The appendix

The default parameters of the Producer given by the authorities are as follows (in which the parameter with a long timeout, as mentioned in the previous article, the result of debug is the default 3000ms, not 10000ms) :

Figure 4.

RocketMQ is an excellent open source messaging middleware. Many developers have made secondary development based on RocketMQ. For example, Ant Group’s commercial product Sofastack MQ message queue is a financial level messaging middleware redeveloped based on RocketMQ kernel. In the message control, transparent operation and maintenance and other aspects have done a lot of excellent work.

We hope that RocketMQ will continue to grow and thrive under the joint efforts of the community’s developers. The original link

This article is the original content of Aliyun, shall not be reproduced without permission.