RocketMQ uses transaction messages to solve distributed transactions.


1. Overview of distributed transactions

A transaction, as you probably know, is a logical set of operations in which each unit will either succeed or fail, and a transaction should satisfy the four properties of ACID.

  • A, AtomicityA transaction is an indivisible unit of work in which all or none of the operations involved in a transaction are performed.
  • C) ConsistencyTransactions should ensure that the state of the database changes from one consistent state to another.
  • I, Isolation: When multiple transactions are executed concurrently, the execution of one transaction should not affect the execution of other transactions.
  • D, DurabilityChanges made to the database by committed transactions should be permanently stored in the database.

In a distributed system, a process can be completed jointly by multiple systems, these systems are deployed on a different server, these systems may correspond to multiple databases at the same time, in this case, if one system fails, it should cause the failure of the entire process, all the updated operation rolled back, only if all systems are successful, This operation can be persisted.

Distributed transaction is to ensure the consistency of data in distributed system.

2. Causes of distributed transactions

Let’s start with an example to illustrate why distributed transactions occur.

Suppose you have two sub-services in a distributed system, one for placing orders and one for adding user credits. When a user selects a product and places an order and pays successfully, the user points will be increased quantitatively according to the price of the product. In this process, adding points is not necessary for the order process, so asynchronous execution via message queues makes sense.


In the whole process mentioned above, a total of four situations may occur based on the two steps of placing orders and adding credits.

  1. Order successful, add points successfully.
  2. Failed to order, failed to add points.
  3. Order success, new points failed.
  4. Order failed, add points successfully.

Cases 1 and 2 need no further explanation, so cases 3 and 4 need further analysis.

First of all, let’s talk about the situation where the user places an order successfully and the points fail. When the user places an order, the order system sends a message to the message queue, and then the points system consumes after receiving the message, but the consumption fails at this time, which leads to the third situation.

In this case, the message sent by the order system is not acknowledged, so you just need to re-consume.

If you ask me what happens if I fail again? The answer is to try again and again. After a certain number of failures, the message will be queued up for dead letter processing, and you need to develop manual processing.

Next comes the fourth case, the order fails and the new points succeed. The user did not place an order to add points to him, the user must feel great, the boss will have a headache, and as a development, this month’s performance is certainly not, serious may also be dismissed, back criminal responsibility.


This happens because both the user steps of placing an order and sending a message to the message queue do not fail at the same time. For example, the order needs to deduct the balance, assuming that the message is sent when the order is placed in the program. When the message is sent but the program processes the balance deduction, the message is insufficient, so the order fails, but the points are added.

In this case, we need to place the entire ordering process in the same transaction, and only after all the ordering processes are successful, can we send a message to increase or decrease the user points, so as to ensure that the whole process will either succeed or fail.

Transactional messages are one way to do this.

3. Transaction message execution process

The official RocketMQ Transaction Example introduces Transaction messages, which can be thought of as a two-phase committed message (half message, half message) that guarantees ultimate consistency across a distributed system, either success or failure.

The execution flow of a transaction message looks like this.

The execution process of transaction message, picture comes from Baidu
  1. Message producer sends a half message toBrokerIf the half-message is successfully sent,BrokerA successful send is returned and the producer begins the local transaction. If the half-message fails to be sent, the entire message collapses at the beginning and fails to be sent.
  2. In special cases, such as when a producer is performing a local transaction or is disconnected from the network,BrokerThe final execution status of the local transaction was not received,BrokerA message check is periodically initiated to check the execution status of a local transaction.
  3. When the producer completes the local transaction, the producer sends a secondary identification toBrokerIf it isCommitThat means the local transaction was successfully executed, the half-message will be marked as deliverable, and the consumer will receive the message; If it isRollbackThe local transaction failed, the half-message will be deleted, and the consumer will not receive the message.
  4. The consumer will eventually receive a half-message marked deliverable for consumption.

4. Source code analysis

4.1 Example of sending a transaction Message

According to the given RocketMQ official Transaction example sample code, we can know by TransactionMQProducer. SendMessageInTransaction method to send a Transaction message.

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
        // 1. Create transaction listener
        TransactionListener transactionListener = new TransactionListenerImpl();
        // 2. Create producer
 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");  // 3. Create an asynchronous thread pool to check the transaction status  ExecutorService executorService = new ThreadPoolExecutor(2. 5. 100. TimeUnit.SECONDS,  new ArrayBlockingQueue<Runnable>(2000),  new ThreadFactory() {  @Override  public Thread newThread(Runnable r) {  Thread thread = new Thread(r);  thread.setName("client-transaction-msg-check-thread");  return thread;  }  });   producer.setExecutorService(executorService);  producer.setTransactionListener(transactionListener);  // 4. Start the producer  producer.start();  // 5. Send transaction messages  String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};  for (int i = 0; i < 10; i++) {  Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));  SendResult sendResult = producer.sendMessageInTransaction(msg, null);  System.out.printf("%s%n", sendResult);  Thread.sleep(10);  }  // 6. Wait for the local transaction to execute  for (int i = 0; i < 100000; i++) {  Thread.sleep(1000);  }  // close the producer  producer.shutdown();  } } Copy the code
  1. Create a message listener, which is an implementationTransactionListenerInterface class whose main function is to perform local transactionsTransactionListener#executeLocalTransactionAnd local transaction status checkTransactionListener#checkLocalTransaction.
  2. Create a producer instance of the transaction message, noting that the producer of the transaction message is oneTransactionMQProducerClass.
  3. Create a thread pool that is primarily used to asynchronously perform a rollback of transaction status.
  4. Start the producer and passTransactionMQProducer#sendMessageInTransactionMethod sends a transaction message.

4.2 Brief analysis of transaction listeners

Unlike normal messages, sending a transaction message also requires setting up a transaction listener to look at the code.

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

 // Perform a local transaction and submit the local transaction status  @Override  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {  // Atomic integer class increment, set an integer value for each message, used for transaction lookup logic  int value = transactionIndex.getAndIncrement();  int status = value % 3;  localTrans.put(msg.getTransactionId(), status);  // The local transaction returns an unknown state so that all messages follow the logic of transaction backcheck  return LocalTransactionState.UNKNOW;  }   // transaction callback  @Override  public LocalTransactionState checkLocalTransaction(MessageExt msg) {  // Obtain the corresponding integer value set by the message when executing the local transaction according to the TransactionId, and return the different final transaction execution status  Integer status = localTrans.get(msg.getTransactionId());  if (null! = status) { switch (status) {  case 0:  return LocalTransactionState.UNKNOW;  case 1:  return LocalTransactionState.COMMIT_MESSAGE;  case 2:  return LocalTransactionState.ROLLBACK_MESSAGE;  default:  return LocalTransactionState.COMMIT_MESSAGE;  }  }  return LocalTransactionState.COMMIT_MESSAGE;  } } Copy the code

The executeLocalTransaction method is used to execute local transactions, and the return value is a LocalTransactionState enumerated class, including COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW, They represent commit (local transaction successful), rollback (local transaction failed), and unknown (message back verification required).

The checkLocalTransaction method is used to check back the message to determine the execution status of the local transaction, again returning the LocalTransactionState enumeration class. For messages whose local transaction state is unknown, the message producer periodically calls the message callback method using the created message callback thread pool to confirm the final state of the local transaction.

That’s the process of sending transaction messages, and it’s time for an exciting source code analysis.


From the front and back end jun think a framework as long as it can be used well, why have to look at the source code, later I know, look at the source code is not only the opportunity to learn the code of predecessors, but also the future may encounter “pit” to prepare, after all is open source ah…

4.3 How to mark it as a transaction Message

In the method of transaction messages sent TransactionMQProducer sendMessageInTransaction, eventually enforce a DefaultMQProducerImpl# sendMessageInTransaction. Before this method sends a Message, a property is set to the Message entity class Message.

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
Copy the code

This attribute identifies the message as transactional and determines that the message is not visible to the consumer.

As for why it’s not visible… DefaultMQProducerImpl#sendKernelImpl first determines the TRAN_MSG flag that was just set, Then by setting the message system tag set sysFlag to MessageSysFlag. TRANSACTION_PREPARED_TYPE.

int sysFlag = 0;
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if(tranMsg ! =null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
Copy the code

Finally in the method of sending a message SendMessageProcessor# sendMessage to determine whether there are MessageSysFlag. TRANSACTION_PREPARED_TYPE logo, If so, the prepareMessage method is called to send the transaction message.

String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// Determine whether there is a transaction message flag
if(traFlag ! =null&& Boolean.parseBoolean(traFlag) && ! (msgInner.getReconsumeTimes() >0 && msgInner.getDelayTimeLevel() > 0)) {
    // Check whether the producer does not support sending transaction messages
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
 response.setCode(ResponseCode.NO_PERMISSION);  response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");  return response;  }  // Send transaction messages  putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } // omit other code... Copy the code

Let’s follow up with the prepareMessage method, whose entire call chain looks like this:

TransactionalMessageService#prepareMessage
-> TransactionalMessageServiceImpl#prepareMessage
-> TransactionalMessageBridge#putHalfMessage

public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
 return store.putMessage(parseHalfMessageInner(messageInner)); }  private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {  // Back up the original topic and queue of messages  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));  / / set  msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));  // Reset Topic to transaction half-message Topic  msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());  // The consumption queue changes to 0  msgInner.setQueueId(0);  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));  return msgInner; }  // TransactionalMessageUtil.buildHalfTopic() public static String buildHalfTopic(a) {  return MixAll.RMQ_SYS_TRANS_HALF_TOPIC; } Copy the code

The putHalfMessage method rewraps the transaction message, first backing up the topic and consumption queue of the transaction message into the HashMap property Massage#properties. Then change the message topic to RMQ_SYS_TRANS_HALF_TOPIC, which is the topic for storing transaction messages. After message back determines that the status of the local transaction is COMMIT_MESSAGE, the topic of the message is restored to its original topic, making it available for consumer consumption.

5. Summary

This article describes why distributed transactions occur, how RocketMQ executes Transaction messages, the official Transaction example, and how a message is marked as a Transaction message in the analysis source code.

Copyright notice: this article is created by Planeswalker23, please bring the original link to reprint, thanks

6. Reference materials

  • Inside RocketMQ technology
  • Transaction example
  • Database transaction

This article is formatted using MDNICE