Chapter 4 RocketMQ applications

Transaction messages

1 introduction of problems

Here is A demand scenario: ICBC user A transfers 10,000 yuan to CCB user B.

We can use synchronous messages to handle this requirement scenario:

  1. The ICBC system sends a synchronous message M to Broker with an increase of $10,000 to B

  2. After the message is successfully received by the Broker, a successful ACK is sent to the ICBC system

  3. Icbc system deducts 10,000 yuan from user A after receiving A successful ACK

  4. CCB system obtains message M from the Broker

  5. CCB system consumes message M, that is, 10,000 yuan is added to user B

    There is a problem here: if the deduction in step 3 fails, but the message has been successfully sent to the Broker. For MQ, the message can be consumed as soon as it is written successfully. At this time, user B in the CONSTRUCTION Bank system increased by 10,000 yuan. There are data inconsistencies.

2 Solution

The idea is to make steps 1, 2, and 3 atomic, and either they all succeed or they all fail. That is, after the message is successfully sent, the payment must be successfully deducted. If the deduction fails, the message sent successfully is rolled back. The idea is to use transaction messages. The distributed transaction solution is used here.

Use transaction messages to handle this requirement scenario:

  1. Transaction manager TM initiates an instruction to transaction coordinator TC to enable global transactions

  2. Icbc system sends a transaction message M to TC with an increase of RMB 10,000 to B

  3. The TC sends a half-transaction message prepareHalf to the Broker, withholding message M. Message M in the Broker is not visible to the CCB system

  4. The Broker reports the results of the pre-commit execution to the TC.

  5. If the pre-submission fails, TC will report the response of pre-submission failure to TM, and the global transaction ends. If the pre-submission is successful, TC will call the callback operation of ICBC system to complete the operation of withholding RMB 10,000 from ICBC user A

  6. Icbc system will send the execution result of withholding payment to TC, that is, the execution status of local transaction

  7. TC will report the result to TM after receiving the execution result of withholding payment.

    There are three possibilities for the result of withholding:

    Public enum LocalTransactionState {COMMIT_MESSAGE, ROLLBACK_MESSAGE, ROLLBACK_MESSAGE, // Transaction execution failure Unknown, // undetermined, which means that a check is needed to determine the result of the local transaction}
    Copy the code
  8. TM will send different confirmation instructions to TC according to the reported results

  • If the pre-payment is successful (the local transaction status is COMMIT_MESSAGE), TM sends the Global Commit instruction to the TC
  • If the prepayment fails (the local transaction status is ROLLBACK_MESSAGE), TM sends the Global Rollback command to the TC
  • If the status is unknown (the local transaction status is unknown), the local transaction status of the ICBC system will be triggeredTo check operation. During the check back operation, COMMIT_MESSAGE or ROLLBACK_MESSAGE is reported to the TC. The TC reports the result to TM, which then sends the final confirmation command Global Commit or Global Rollback to the TC
  1. TC will send confirmation instruction to Broker and ICBC system after receiving the instruction
  • If the TC receives a Global Commit instruction, it sends a Branch Commit instruction to the Broker and ICBC system. Only then can the message M in the Broker be seen by the CCB system; The deduction operation in icbc user A at this time was really confirmed

  • If the TC receives the Global Rollback instruction, it sends the Branch Rollback instruction to the Broker and ICBC system. The message M in the Broker is now destroyed; The deduction operation in ICBC user A will be rolled back

    The above scheme is to ensure that the message delivery and debit operations can be carried out in one transaction, and if they succeed, they will all be rolled back.

    The above scheme is not a typical XA pattern. Because branch transactions in the XA pattern are asynchronous, message withholding and withholding operations in the transaction messaging scheme are synchronous.

Three basic

Distributed transaction

For distributed transactions, in plain English, an operation consists of several branch operations, which belong to different applications and are distributed on different servers. Distributed transactions require that all of these branch operations either succeed or fail. Distributed transactions, like ordinary transactions, ensure the consistency of operation results.

Transaction message

RocketMQ provides distributed transaction functionality similar to X/Open XA, with transaction messages leading to the ultimate consistency of distributed transactions. XA is a distributed transaction solution, a distributed transaction processing pattern.

Semi-transaction message

A temporarily undeliverable message is marked as “temporarily undeliverable”, meaning it cannot be seen by consumers, after the sender has successfully sent the message to the Broker, but the Broker has not received a final acknowledgement. Messages in this state are semi-transactional messages.

Local transaction status

The result of the Producer callback is a local transaction state, which is sent to TC, which in turn sends to TM. TM determines the global transaction confirmation instruction based on the local transaction status sent by THE TC.

// Describes the execution status of a local transaction
public enum LocalTransactionState {
	COMMIT_MESSAGE,  // The local transaction was successfully executed
	ROLLBACK_MESSAGE,  // Local transaction execution failed
	UNKNOW,  // Indeterminate: a check is required to determine the execution result of the local transaction
}
Copy the code

Message back to check

Message query, that is, query the execution status of a local transaction again. In this example, check whether the withholding operation is successful in DB again.

Note that message lookup is not a callback operation. The callback operation is to do the withholding operation, and the message callback is to see the result of the withholding operation.

The most common causes of message backlookup are two:

1) The callback operation returns UNKNWON

2)TC did not receive the final global transaction confirmation instruction from TM

Message back lookup Settings in RocketMQ

There are three common property Settings for message lookup. They are all set in the configuration file loaded by the broker, for example:

  • TransactionTimeout =20, specifies that TM should send the final confirmation status to TC within 20 seconds, otherwise message backcheck will be triggered. The default value is 60 seconds
  • TransactionCheckMax =5: specifies a maximum of five checkbacks. If the value exceeds the threshold, the message is discarded and an error log is recorded. The default value is 15.
  • TransactionCheckInterval =10, set the interval for multiple message checks to 10 seconds. The default value is 60 seconds.

Three Musketeers in XA mode

The XA protocol

XA (Unix Transaction) is a distributed Transaction solution, a distributed Transaction processing pattern, based on the XA protocol. The XA protocol was first proposed by Tuxedo (Transaction for Unix has Been Extended for Distributed Operation) and given to the X/Open organization. Interface standard for resource managers and transaction managers.

There are three important components in XA mode: TC, TM, and RM.

TC

A Transaction Coordinator. Maintains the state of global and branch transactions and drives global transaction commit or rollback.

The Broker in RocketMQ acts as a TC.

TM

Transaction Manager. Define the scope of a global transaction: start, commit, or roll back the global transaction. It is actually the initiator of the global transaction.

The Producer of the transaction messages in RocketMQ acts as TM.

RM

Resource Manager. Manage resources for branch transaction processing, talk to TCS to register branch transactions and report status of branch transactions, and drive commit or rollback of branch transactions.

In RocketMQ, both Producer and Broker of transaction messages are RM.

5 XA architecture

XA mode is a typical 2PC, and it works as follows:

  1. TM initiates an instruction to TC to start a global transaction.

  2. Based on service requirements, RMS register branch transactions with TCS one by one, and TCS send pre-execution instructions to RMS one by one.

  3. Each RM will perform local transaction preexecution after receiving the command.

  4. RM reports the pre-execution result to the TC. Of course, the result could be success or failure.

  5. TC will Report the summary results to TM after receiving reports from each RM, and TM will send confirmation instructions to TC according to the summary results.

  • If all results are successful responses, the Global Commit directive is issued to the TC.
  • If there is a failure response, send the Global Rollback instruction to the TC.
  1. After receiving the command, the TC sends a confirmation command to RM.

    The transaction message scheme is not a typical XA pattern. Because branch transactions in the XA pattern are asynchronous, message withholding and withholding operations in the transaction messaging scheme are synchronous.

6 note

  • Transaction messages do not support delayed messages

  • Do idempotency checks for transaction messages because they may be consumed more than once (because of rollback and recommit)

7 Code Examples

Defines icbc transaction listeners

public class ICBCTransactionListener implements TransactionListener {
    // Callback operation method
    // A successful pre-commit triggers the execution of this method, which is used to complete the local transaction
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("Pre-commit message successful:" + msg);
        // If the TAGA message is received, the deduction operation is successful, and the TAGB message is not successful,
        // TAGC indicates that the result of the deduction is not clear and needs to perform a message back check
        if (StringUtils.equals("TAGA", msg.getTags())) {
        	return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TAGB", msg.getTags())) {
        	return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("TAGC", msg.getTags())) {
        	return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;
    }
    
    // Message back lookup method
    // There are two common causes of message backlookup:
    // 1) The callback returns UNKNWON
    // 2)TC did not receive the final global transaction confirmation instruction from TM
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    	System.out.println("Perform message back lookup" + msg.getTags());
    	returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

Define transaction message producers

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
    	TransactionMQProducer producer = new TransactionMQProducer("tpg");
    	producer.setNamesrvAddr("rocketmqOS:9876");
        
        /** * define a thread pool *@paramCorePoolSize Number of core threads in the thread pool *@paramMaximumPoolSize Specifies the maximum number of threads in a thread pool *@paramKeepAliveTime This is a time. When the number of threads in the thread pool is greater than the number of core threads, the lifetime of excess idle threads *@paramUnit Time unit *@paramWorkQueue Indicates the queue that temporarily stores tasks. The parameter is queue length *@paramThreadFactory threadFactory */
        ExecutorService executorService = new ThreadPoolExecutor( 2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 2000 ), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                returnthread; }});// Specify a thread pool for the producer
        producer.setExecutorService(executorService);
        // Add a transaction listener for the producer
        producer.setTransactionListener(new ICBCTransactionListener());
        producer.start();
        String[] tags = {"TAGA"."TAGB"."TAGC"};
        for (int i = 0 ; i < 3 ; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TTopic", tags[i], body);
            // Send transaction messages
            // The second parameter specifies the business parameter to be used when performing a local transaction
            SendResult sendResult = producer.sendMessageInTransaction(msg,null);
            System.out.println("Send result:"+ sendResult.getSendStatus()); }}}Copy the code

Defining consumers

Just use the SomeConsumer of the normal message as the consumer.

public class SomeConsumer {
    public static void main(String[] args) throws MQClientException {
        // Define a pull consumer
        // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
        // Define a push consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        / / specify the nameServer
        consumer.setNamesrvAddr("rocketmqOS:9876");
        // Specify consuming from the first message
    	consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Specify the consumption topic and tag
        consumer.subscribe("TTopic"."*");
        // Specify broadcast mode for consumption. Default is cluster mode.
        // consumer.setMessageModel(MessageModel.BROADCASTING);
        // Register a message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // This method is triggered as soon as the broker has a message to which it subscribes,
            // The return value is the current state of consumer consumption
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // Consume messages one by one
                for (MessageExt msg : msgs) {
                	System.out.println(msg);
                }
                // Return consumption status: consumption succeeded
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start consumer spending
        consumer.start();
        System.out.println("Consumer Started"); }}Copy the code

Five, batch message

1 Send messages in batches

Sending limit

When sending messages, producers can send multiple messages at once, which greatly improves the sending efficiency of the Producer. However, note the following points:

  • Messages sent in batches must have the same Topic
  • Messages sent in batches must have the same flush policy
  • Batch messages cannot be delayed messages or transaction messages

Batch send size

By default, the total size of messages sent in a batch cannot exceed 4MB bytes. If you want to exceed this value, there are two solutions:

  • Scheme 1: Split the batch messages into several message sets no larger than 4M and send them in batches for many times
  • Scheme 2: Modify properties on the Producer side and the Broker side

** The Producer side needs to set the maxMessageSize property of the Producer before sending

** The Broker side needs to modify the maxMessageSize property in the configuration file it loads

Size of the message sent by the producer

Instead of serializing the Message directly and sending it to the network, producers send a Message using the send() method that generates a string from the Message. The string consists of four parts: Topic, message Body, message log (20 bytes), and a bunch of key-values used to describe the message. These attributes include attributes such as the producer address, production time, QueueId to send, and so on. The data that is ultimately written to the message unit in the Broker comes from these attributes.

2 Bulk consumption messages

Modifying Batch Attributes

Consumer MessageListenerConcurrently to monitor interface consumeMessage () method of the first parameter to the message list, but by default every time can only consume a message. If you want to make one can consume multiple messages, will be modified to Consumer consumeMessageBatchMaxSize attribute to specify. However, the value cannot exceed 32. By default, consumers can pull up to 32 messages at a time. To change the maximum value of a pull, you can specify this by modifying the pullBatchSize property for the Consumer.

Existing problems

Consumer pullBatchSize attributes and consumeMessageBatchMaxSize attribute set of the bigger the better? Of course not.

  • The larger the pullBatchSize value is set, the longer it will take for the Consumer to pull each time and the higher the probability of transmission problems over the network. If a problem occurs during the pull, all messages in the batch need to be pulled again.
  • ConsumeMessageBatchMaxSize value set, the greater the Consumer information concurrent consumption ability is lower, and the consumption of messages with the same result of consumption. For consumeMessageBatchMaxSize specify a batch of messages will use only one thread for processing, and in the process as long as there is a message handling exceptions, again, need all of these messages to consumption.

3 Code Examples

The requirement for this batch send is not to change the default value of the maximum 4M sent, but to prevent the number of batch messages sent beyond the 4M limit.

Define a message list splitter

// Message list splitter: it will only handle the case that the size of each message does not exceed 4M.
// If there is a message whose size is larger than 4M, the splitter cannot process it.
// It returns the message directly as a sublist. There is no further partition
public class MessageListSplitter implements Iterator<List<Message>> {
    // Specify a limit of 4M
    private final int SIZE_LIMIT =  4 * 1024 * 1024 ;
    // Store all messages to be sent
    private final List<Message> messages;
    // Start index of the small set of messages to be sent in batches
    private int currIndex;
    public MessageListSplitter(List<Message> messages) {
    	this.messages = messages;
    }
    @Override
    public boolean hasNext(a) {
        // Determine that the index of messages currently being traversed is less than the total number of messages
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next(a) {
        int nextIndex = currIndex;
        // Records the size of the small batch of messages that are currently being sent
        int totalSize = 0 ;
        for (; nextIndex < messages.size(); nextIndex++) {
            // Get the message for the current traversal
            Message message = messages.get(nextIndex);
            // Count the size of the message currently traversed
            int tmpSize = message.getTopic().length() + message.getBody().length;
        	Map<String, String> properties = message.getProperties();
        	for (Map.Entry<String, String> entry : properties.entrySet()) {
        		tmpSize += entry.getKey().length() + entry.getValue().length();
    		}
            tmpSize = tmpSize + 20 ;
            // Check whether the current message itself is larger than 4M
            if (tmpSize > SIZE_LIMIT) {
                if (nextIndex - currIndex == 0 ) {
                	nextIndex++;
                }
   				break;
    		}
            if (tmpSize + totalSize > SIZE_LIMIT) {
            	break;
            } else{ totalSize += tmpSize; }}// end-for
    	// Get a subset of the current messages list [currIndex, nextIndex]
    	List<Message> subList = messages.subList(currIndex, nextIndex);
    	// Start index for the next iteration
    	currIndex = nextIndex;
    	returnsubList; }}Copy the code

Define bulk message consumers

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        // Specifies the maximum size of the message to be sent. The default is 4M
        // However, it is not necessary to modify this property alone. You also need to modify the configuration file loaded by the broker
        / / maxMessageSize properties
        // producer.setMaxMessageSize(8 * 1024 * 1024);
        producer.start();
        // Define the set of messages to be sent
        List<Message> messages = new ArrayList<>();
        for (int i = 0 ; i < 100 ; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("someTopic"."someTag", body);
            messages.add(msg);
        }
        // Define a message list splitter that splits the message list into multiple small lists no larger than 4M
        MessageListSplitter splitter = new MessageListSplitter(messages);
        while (splitter.hasNext()) {
            try {
            	List<Message> listItem = splitter.next();
            	producer.send(listItem);
            } catch(Exception e) { e.printStackTrace(); } } producer.shutdown(); }}Copy the code

Define batch message producers

public class BatchConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("someTopicA"."*");
        // Specifies that 10 messages can be consumed at a time. The default is 1
        consumer.setConsumeMessageBatchMaxSize( 10 );
        // Specifies that 40 messages can be pulled from the Broker at a time. The default is 32
        consumer.setPullBatchSize( 40 );
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                	System.out.println(msg);
                }
                // Return result of successful consumption
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                // Return result when consuming exception
                // return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}); consumer.start(); System.out.println("Consumer Started"); }}Copy the code

6. Message filtering

When subscribing to messages, in addition to specifying the Topic to which messages are to be subscribed, messages in a given Topic can also be filtered according to specified criteria, that is, to subscribe to more fine-grained message types than Topic.

There are two types of filtering for specific Topic messages: Tag filtering and SQL filtering.

1 Tag to filter

The Tag to subscribe to the message is specified by the consumer’s subscribe() method. If to subscribe to more than one Tag message, using the Boolean or operator between the Tag (double vertical bar | |) connection.

DefaultMQPushConsumer consumer = newDefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
Copy the code

2 SQL filter

SQL filtering is a way to filter user attributes embedded in a message through a specific expression. SQL filtering enables complex filtering of messages. However, only consumers using PUSH mode can use SQL filtering.

Multiple constant types and operators are supported in SQL filter expressions.

Supported constant types:

  • Values: 123, 3.1415
  • Characters: must be enclosed in single quotes, such as’ ABC ‘
  • Boolean: TRUE or FALSE
  • NULL: Special constant, indicating NULL

The supported operators are:

  • Comparison values: >, >=, <, <=, BETWEEN, =

  • Character comparison: =, <>, IN

  • Logical operations: AND, OR, NOT

  • NULL judgment: IS NULL or IS NOT NULL

SQL filtering for messages is not enabled on the Broker by default. You need to add the following properties to the configuration file loaded by the Broker to enable this function:

enablePropertyFilter = true
Copy the code

This modified configuration file needs to be specified when you start the Broker. For example, to start a single Broker, modify the configuration file conf/broker.conf, run the following command:

sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
Copy the code

3 Code Examples

Define Producer Tag filtering

public class FilterByTagProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        String[] tags = {"myTagA"."myTagB"."myTagC"};
        for (int i = 0 ; i < 10 ; i++) {
            byte[] body = ("Hi," + i).getBytes();
            String tag = tags[i%tags.length];
            Message msg = new Message("myTopic",tag,body); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } producer.shutdown(); }}Copy the code

Define Tag filter Consumer

public class FilterByTagConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new
        DefaultMQPushConsumer("pg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("myTopic"."myTagA || myTagB");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt me:msgs){
                	System.out.println(me);
                }
        		returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.println("Consumer Started"); }}Copy the code

Define SQL to filter Producer

public class FilterBySQLProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        for (int i = 0 ; i < 10 ; i++) {
            try {
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("myTopic"."myTag", body);
                msg.putUserProperty("age", i + "");
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            } catch(Exception e) { e.printStackTrace(); } } producer.shutdown(); }}Copy the code

Define SQL filter Consumer

public class FilterBySQLConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new
        DefaultMQPushConsumer("pg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("myTopic", MessageSelector.bySql("age between 0 and 6"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt me:msgs){
                	System.out.println(me);
                }
            	returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.println("Consumer Started"); }}Copy the code