RocketMQ use tutorial related series of directories


Technical work, should be rewarded with praise, to form a good habitCopy the code

Section one: Introduction

Introduction to RocketMQ transaction messages

After version 4.3.0, with transaction messages as a feature, the two-phase commit protocol is still the most common for distributed transactions.

Introduction to transaction message flow

Chinese pictures are friendlier

The figure above illustrates the general scheme of transaction messages, which is divided into two flows:

  • Sending and submitting of normal transaction messages (black-line flow)
  • Compensation flow for transaction messages (black line + red line)

1) Sending and submitting normal transaction messages

(1) Send half messages.

(2) The server responds to the message writing result.

(3) Execute a local transaction based on the sent result (if the write fails, the half message is not visible to the business and the local logic is not executed).

(4) Perform Commit or Rollback according to the local transaction status (Commit generates the message index, and the message is visible to consumers)

2) Compensation process of transaction messages

The compensation phase is used to resolve the timeout or failure of message Commit or Rollback. The process is as follows:

(1) Send half messages.

(2) The server responds to the message writing result.

(3) Execute a local transaction based on the sent result (if the write fails, the half message is not visible to the business and the local logic is not executed).

(4) After the local transaction is executed, no Commit or Rollback operations are performed

(5) Initiate a “Rollback” from the server for transaction messages (pending messages) that are not Commit/Rollback. (The maximum retry time is 15 times (determined by configuration parameters). If the retry time exceeds 15 times, the message will be discarded by default.)

(6) After receiving the backcheck message, Producer checks the status of local transactions corresponding to the backcheck message

(7) Recommit or Rollback based on the local transaction status

The compensation phase is used to resolve the timeout or failure of message Commit or Rollback.

3) Transaction message status

Transaction messages have three states: commit state, rollback state, and intermediate state:

  • TransactionStatus.Com mitTransaction: submit a transaction, it allows consumer spending this message.
  • TransactionStatus. RollbackTransaction: roll back a transaction, it represents the message will be deleted, is not permitted to consume.
  • TransactionStatus. Unknown: intermediate state, it means we need to check the message queue to determine the state.

Use restrictions

1. Transaction messages do not support delayed messages and batch messages.

2. To avoid half-queue message accumulation due to a single message being checked too many times, we limit the number of checks for a single message to 15 by default, but users can change this limit through the ‘transactionCheckMax’ parameter in the Broker configuration file. If a message has been checked more than N times (N = ‘transactionCheckMax’) the Broker discards the message and prints an error log by default. Users can rewrite ` AbstractTransactionCheckListener ` class to modify this behavior.

3. Transaction messages will be checked after a specified length of time such as the transactionMsgTimeout parameter in the Broker configuration file. Users can also change this limit by setting the user attribute CHECK_IMMUNITY_TIME_IN_SECONDS when sending transaction messages, which takes precedence over the ‘transactionMsgTimeout’ parameter.

4. Transactional messages may be checked or consumed more than once.

5. The target topic message submitted to the user may fail, currently depending on logging. Its high availability is guaranteed by RocketMQ’s own high availability mechanism, and if you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, a synchronous dual write mechanism is recommended.

6. The producer ID of the transaction message cannot be shared with the producer ID of other types of messages. Unlike other types of messages, transaction messages allow reverse lookup, and MQ servers can query to consumers through their producer IDS.

Section two: Usage scenarios

Synchronous messages solve the problem that the message must have been delivered successfully (the message is not delivered until the Broker responds to the Send_OK status code)

Transaction messages solve the problem of doing both local transactions and message delivery

Example: Li Si wants to transfer 10,000 yuan to Zhang SAN.

A synchronous message

The bank sent a synchronous message to MQ and added 10,000 yuan to Zhang SAN. 2. MQ ACK feedback was sent successfully.Copy the code

Transaction message

The bank sent a transaction message to MQ, and added 10,000 yuan to Jack 2. The Broker precommit successfully, called back excuteCommit, and executed the 10,000 yuan deductible from Jack 3. The system can consume this message if the system debits an exception, then the message prepareCommit is in MQ, but not visible to the system. In addition, if the ACK network is lost or delayed, MQ sends a retry to the system if it does not receive an ACK due to timeout.Copy the code

Section three: Code practice

Transaction message producer

/** * Sends transaction messages */ public class Producer {public static void main(String[] args) throws Exception {// 1. Producer = new TransactionMQProducer("demo_transaction_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. TransactionListenerImpl transactionListener = new TransactionListenerImpl(); producer.setTransactionListener(transactionListener); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }}); / / set the thread pool producer. SetExecutorService (executorService); // 3. Start (); System.out.println(" producer start "); String[] tags = { "TAGA", "TAGB", "TAGC" }; for (int i = 0; i < 3; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. Message MSG = new Message("TransactionTopic", tags[I], ("Hello xuzhu" + I).getbytes ()); / / 5. Send the message SendResult result = producer. SendMessageInTransaction (MSG, "hello - xuzhu_transaction"); SendStatus status = result.getsendStatus (); System.out.println(" send result :" + result); System.out.println(" send result status :" + status); Timeunit.seconds.sleep (2); } // 6. Shutdown (); System.out.println(" producer end "); }}Copy the code

Transaction listener

Public class TransactionListenerImpl implements TransactionListener {/** * complete the local transaction logic ** @param message * @param o *@return org.apache.rocketmq.client.producer.LocalTransactionState **/ @Override public LocalTransactionState ExecuteLocalTransaction (Message Message, Object O) {system.out.println (" executing local transaction ----"); if (StringUtils.equals("TAGA", message.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TAGB", message.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if (StringUtils.equals("TAGC", message.getTags())) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.UNKNOW; } / news back to the * * * * * @ param messageExt * @ return org. Apache. Rocketmq. Client. Producer. * * / @ LocalTransactionState Override Public LocalTransactionState checkLocalTransaction(MessageExt MessageExt) {system.out.println (" Message Tag:" + messageExt.getTags()); return LocalTransactionState.COMMIT_MESSAGE; }}Copy the code

Transaction message consumer

public class Consumer { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_transaction_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); // 3. Subscribe Topic and Tag consumer. Subscribe ("TransactionTopic", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println( "consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); System.out.println(" consumer start "); }}Copy the code

Effect:

Follow from breakpoint, start producer, enter local transaction method

As you can see from the local transaction method, only the first data is successful, we will see later, how many data can be consumed

 

The consumer, will only consume number one, success

Section 4: checkLocalTransaction does not fire

If you are careful, you may notice that there is a method called checkLocalTransaction in the transaction listener, and if you break it, you will notice that you will never execute this method.

The reason is that a rollback is not triggered until a transaction is rolled back or committed

Based on this theory, let’s tweak the code

Adjusted producer

/** * Sends transaction messages */ public class Producer {public static void main(String[] args) throws Exception {// 1. Producer = new TransactionMQProducer("demo_transaction_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. TransactionListenerImpl transactionListener = new TransactionListenerImpl(); producer.setTransactionListener(transactionListener); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }}); / / set the thread pool producer. SetExecutorService (executorService); // 3. Start (); System.out.println(" producer start "); String[] tags = { "TAGC", "TAGA", "TAGB" }; for (int i = 0; i < 3; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. Message MSG = new Message("TransactionTopic", tags[I], ("Hello xuzhu" + I).getbytes ()); / / 5. Send the message SendResult result = producer. SendMessageInTransaction (MSG, "hello - xuzhu_transaction"); SendStatus status = result.getsendStatus (); System.out.println(" send result :" + result); System.out.println(" send result status :" + status); Timeunit.seconds.sleep (120); } // 6. Shutdown (); System.out.println(" producer end "); }}Copy the code

Adjustments:

String[] tags = { "TAGC", "TAGA", "TAGB" };

 TimeUnit.SECONDS.sleep(120);
Copy the code

The transaction listener and producer code remain unchanged

Effect:

As you can see, the lookup message has come in

Second question, what are The Times and timing of the checks?

The data that looks for on the net is to say by default 15 times, but regular time does not have data to have explanation

Unconvinced, I downloaded the rocketMQ source code and found the answer in the BrokerConfig class

 

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 15;

/**
 * Transaction message check interval.
 */
@ImportantField
private long transactionCheckInterval = 60 * 1000;
Copy the code

Note:

TransactionCheckMax parameter: The default value is 15 times. After 15 times, rollback is not successful

TransactionCheckInterval parameter: The default checkback interval is 1 minute.

Both of these parameters can be configured in broker.properties

Scientific verification:

Adjustments:

#Producer class timeunit.seconds.sleep (60 * 17); #TransactionListenerImpl class /** * message callback ** @param messageExt *@return org.apache.rocketmq.client.producer.LocalTransactionState **/ @Override public LocalTransactionState CheckLocalTransaction (MessageExt MessageExt) {system.out.println (" Message Tag:" + MessageExt.gettags ()); System.out.println(new Date()); return LocalTransactionState.UNKNOW; }Copy the code

The check interval is 1 minute, which meets the parameter Settings

The number of query times is 15, which also matches the parameter Settings