Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

A theory of

RocketMQ’s transaction message is the event that sends a message and other events that need to succeed or fail simultaneously. A common scenario: an order is paid and a coupon is sent to the user. The order system needs to send a message as a producer and the coupon system needs to receive a message as a consumer. In this scenario, sending the “user paid” message must succeed or fail at the same time as the “user paid” operation.

RocketMQ implements transactional messages in a two-phase commit manner. The process of TransactionMQProducer dealing with the above situation is to first send a semi-message “user paid”, and then perform the user payment operation after successfully sending the message. Based on whether the operation result is successful, determine whether the previous “user paid” message is commit or rollback. The specific process is shown in the figure below:

Strict transaction implementation, ACID needs to be implemented, so is RocketMQ implemented? Also take “send coupons to users after order payment” scenario description:

A (atomicity) : Payment and voucher issue occur at the same time or not, achieving atomicity.

C (consistency) : final consistency is achieved and is inconsistent until steps 4 or 7 above are completed.

I (isolation) : It is possible to read uncommitted, so isolation is not achieved.

D (persistence) : The message will eventually fall, which implements persistence.

Two practical

(1) Create the transactional producer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2.5.100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                returnthread; }}); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags =new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000); } producer.shutdown(); }}Copy the code

(2) Implement the TransactionListener Interface

import.public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null! = status) {switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    returnLocalTransactionState.ROLLBACK_MESSAGE; }}returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

Three reference

Transaction example

RocketMQ Combat and Principle Analysis Yang Kaiyuan 3.2.4