define

The RocketMQ Transactional Message means that applying a local transaction and sending a Message operations can be defined into a global transaction that either succeeds or fails at the same time. RocketMQ’s transaction messages provide distributed transaction functionality similar to X/Open XA, through which the ultimate consistency of distributed transactions can be achieved.

Demo

The following example, again based on the Spring Cloud Stream programming model and with the implementation of Spring Cloud Alibaba RocketMQ, demonstrates the use of transactional messages.

process

The transaction message interaction flow is as follows:

Transaction message sending steps are as follows:

  1. The producer sends a semi-transactional message to the RocketMQ version of the message queue server.
  2. Message queue After the RocketMQ server persists the message successfully, it sends an Ack to the producer to confirm that the message has been successfully sent. In this case, the message is a semi-transaction message.
  3. The producer starts executing the local transaction logic.
  4. The producer submits the result (Commit or Rollback) to the server based on the local transaction execution result. After receiving the result, the server processes the result as follows:
    • The result of the second confirmation is Commit: the server marks the semi-transaction message as deliverable and delivers it to the consumer.
    • The result is Rollback: The server will not deliver the message to the consumer and performs Rollback based on the following logic.

The steps for checking back transaction messages are as follows:

  1. In the case that the network is disconnected or the producer application is restarted, the secondary confirmation submitted in Step 4 does not reach the server. After a fixed period of time, the server will initiate message back check for any producer instance in the producer cluster.
  2. After receiving the message, the producer needs to check the final result of the local transaction execution of the corresponding message.
  3. The producer submits a second acknowledgement based on the final status of the local transaction, and the server continues to process the half-transaction message according to Step 4.

configuration

As before, I’ve configured the producer messages together, starting with the configuration file:

spring:
  application:
    name: mq-example
  cloud:
    stream:
      bindings:

        input-transaction:
          content-type: application/json
          destination: TransactionTopic
          group: transaction-consumer-group
       
        output-transaction:
          content-type: application/json
          destination: TransactionTopic

      rocketmq:
        # RocketMQ Binder configuration items, corresponding RocketMQBinderConfigurationProperties class
        binder:
          Configure rocketMQ nameserver address
          name-server: 127.0. 01.: 9876
          group: rocketmq-group
        bindings:
          output-transaction:
            # corresponding RocketMQProducerProperties class
            producer:
              producerType: Trans
              group: transaction-producer-group # producer grouping
              transactionListener: myTransactionListener
         

Copy the code

Note that the producer type is Trans, transaction message.

Corresponding producer and consumer groups are also configured, and these two concepts are reviewed here

Consumer group: A set of producers of the same kind who send the same kind of messages and send them logically. If a transaction message is sent and the original producer crashes after sending, the Broker server contacts other producer instances in the same producer group to commit or backtrack consumption.

Producer group: A collection of the same kind of consumers, which usually consume the same kind of messages and consume logically. Consumer groups make it easy to achieve the goals of load balancing and fault tolerance in terms of message consumption. Note that the consumer instances of the consumer group must subscribe to the exact same Topic. RocketMQ supports two messaging modes: Clustering and broadcast consumption.

implementation

TransactionListener is our custom transactionListener. See the code below for details:

@Component("myTransactionListener")
public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object num = msg.getProperty("test");

        if ("1".equals(num)) {
            System.out.println("executer: " + new String(msg.getBody()) + " unknown");
            return LocalTransactionState.UNKNOW;
        } else if ("2".equals(num)) {
            System.out.println("executer: " + new String(msg.getBody()) + " rollback");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        System.out.println("executer: " + new String(msg.getBody()) + " commit");
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("check: " + new String(msg.getBody()));
        returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

The above code is a reference to the official Demo. You can see that different transaction states are returned depending on num

  • ifnum1, the returnUNKNOW“, indicating that the local transaction status is unknown and the transaction status needs to be checked periodically, the checkLocalTransaction method is executed.
  • ifnum2, the returnROLLBACK_MESSAGE, indicating that the local transaction status is rolled back. The broker rolls back previously committed transaction messages, i.e. undelivered messages.
  • ifnum3, the returnCOMMIT_MESSAGE, indicating that the local transaction status is committed and the broker will post messages.

Sending a message is similar to the previous code:

 @GetMapping("/send_transaction")
    public void sendTransaction(a) {

        String msg = "This is a transaction message";
        Integer num = 2;

        MessageBuilder builder = MessageBuilder.withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
        builder.setHeader("test", String.valueOf(num));
        builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
        Message message = builder.build();
        mySource.outputTransaction().send(message);
    }
Copy the code

To ensure ultimate consistency of reliable messages, you need to have a database table that records transaction state,

The UNKNOW state is stored at the beginning of the transaction, and when the transaction is abnormal, the ROLLBACK_MESSAGE state is returned and logged in the database table. When the transaction commits successfully, change the state to COMMIT_MESSAGE.

With the transaction message table, the checkLocalTransaction method can query the transaction status against this table.

Of course, if A complete distributed transaction spans two systems A and B as shown in the figure above, if the transaction of SYSTEM B fails to roll back, consider whether the transaction of system A needs to be rolled back. If so, system A should provide A rollback interface for system B to call.

reference

  • www.alibabacloud.com/help/zh/doc…
  • www.iocoder.cn/Spring-Clou…