Distributed transaction

First, the concept of distributed things

Distributed transaction means that transaction participants, transaction supporting servers, resource servers and transaction managers are located on different nodes of different distributed systems. In simple terms, a large operation is composed of different small operations, and these small operations will all succeed or all fail. Distributed transactions are designed to ensure data consistency between different databases.

Second, the preface

At present, distributed transaction is a problem that has not been completely solved before considering the implementation scheme. Consider whether the current project really needs to pursue strong consistency. Referring to BASE theory, in distributed system, different service nodes are allowed to delay in the synchronization process, but can be repaired after a period of time to achieve the final consistency of data.

  • This article relies on an existing code base. Local code has been uploadedRemote warehouse Github.com/BenjaminFya…

Strong consistency

  • Consider the XA protocol, which is guaranteed by two-phase commit or three-phase commit. Implementation of the code is more intrusive

Final consistency

  • Consider TCC mode, compensation mode, or message queue-based mode. Based on the message queue pattern, RocketMQ can be used, and I’ll describe the use of RocketMQ for transactions in distributed systems.

  • Important: Rocketmq is concerned with the ultimate consistency of data. After the upstream service commits, the downstream service can only succeed and cannot roll back the upstream data

For example, there is an order service, the order service below the integral service, commodity service, coupon service and so on. At the same time of placing an order, it is necessary to notify the integral service to increase the points, the commodity service to reduce the pre-sale inventory, and the coupon service to check whether there are available coupons for the order. The use of message queues for final consistency may have such a situation, the points service and coupon service call success, but the corresponding backend service commodity inventory is insufficient for 0 deduction failure case, how to roll back the order, points, coupon service data. The downstream service of the message queue will eventually succeed, not roll back the upstream data. It is clear that there are limitations to using message queues for ultimate data consistency.

Ali open source Seata

However, I personally recommend using Ali Open Source Seata, which is an open source distributed transaction solution committed to providing high performance and easy to use distributed transaction services. Seata will provide users with AT, TCC, SAGA and XA transaction modes

Iii. Kylin System Architecture Diagram

As shown in the figure above, the current process of operating a Kirin system requires interaction across various services. Controlling distributed transactions is important to ensure that all data operated between these different systems either succeeds or fails. RocketMQ Transactions The RocketMQ transaction distributed transaction processing principles and methods.

RocketMQ transaction messages

  • Apache RocketMQ already supports distributed transaction messages in version 4.3.0. RocketMQ uses the 2PC approach to commit transaction messages and adds compensation logic to handle two-phase timeout or failure messages, as shown in the figure below

RocketMQ Transaction message flow Overview

The figure above illustrates the general scheme of transaction messages, which is divided into two processes: the sending and submission of normal transaction messages, and the compensation process of transaction messages.

Transaction message sending and submission

  • 1. Send half messages.
  • 2. The server responds to the message writing result.
  • Execute local transaction according to send result (if write fails, half message is not visible to business and local logic is not executed)
  • 4. Perform Commit or Rollback based on the local transaction status (Commit generates the message index and makes the message visible to consumers)

The compensation process

  • 5. For transaction messages (pending state messages) that are not Commit/Rollback, the scheduled task initiates a “Rollback” from the server.
  • 6. Producer receives the backcheck message and checks the status of the local transaction corresponding to the backcheck message.
  • 7. Recommit or Rollback based on the local transaction status.

The compensation phase and the compensation phase are used to resolve the timeout or failure of message Commit or Rollback.

RocketMQ transaction message design

Phase one: Prepared Phase Transaction messages are not visible to users in phase one

  • Send the half message to back up the topic of the original message and the message consumption queue, and then change the topic to RMQ_SYS_TRANS_HALF_TOPIC
  • The consumer group is not subscribed to this Topic, so the consumer cannot consume messages of type half. RocketMQ then starts a scheduled task to pull messages from Topic RMQ_SYS_TRANS_HALF_TOPIC for consumption, and to retrieve a service provider from the producer group and send back a transaction status request. The message is committed or rolled back based on the transaction state.

Phase 2: Commit and Rollback (confirmation phase)

  • Commit: After writing a message invisible to the user in phase 1, make the message visible to the user in phase 2 if the operation is Commit.
  • Rollback: Messages that need to be undone at one stage. In the case of Rollback, the messages themselves are not visible to the user, so you do not need to actually undo the messages.

RocketMQ introduces the concept of Op messages, which indicate that a transaction message has a determined state (Commit or Rollback). If a transaction message does not have a corresponding Op message, the status of the transaction is not determined (perhaps phase 2 failed). After the Op message is introduced, the transaction message either Commit or Rollback records an Op operation. Commit as opposed to Rollback simply creates an index of the Half message (which can be consumed by consumers) before writing the Op message.

Op message storage and mapping

  • RocketMQ Op messages written to the global through the source code of the method in a specific Topic – TransactionalMessageUtil. BuildOpTopic (); This Topic is an internal Topic (like the Half message Topic) and will not be consumed by users. The content of the Op message is Offset from the store of the corresponding Half message, so that the Op message can be indexed to the Half message for subsequent lookup operations.

Index building for the Half message

When performing a two-phase Commmit operation, you need to build an index of the messages for the Half message. Since Half messages in phase 1 are written to a particular Topic, phase 2 index building requires reading Half messages, replacing Topic and Queue with the actual target Topic and Queue, and then generating a message visible to the user through a common write operation. So The RocketMQ transaction message phase 2 actually takes the contents of the message stored in phase 1, restores a complete normal message in Phase 2, and then goes through the message write process.

How to handle the message of phase 2 failure?

  • If a RocketMQ transaction message fails during the two-phase process, for example during a Commit operation, a network problem causes the message to fail, then a strategy is required to make the message finally Commit. RocketMQ employs a compensation mechanism called “backcheck.” The Broker checks the status of the undetermined message and sends the message to the respective Producer (the Producer of the same Group). The Producer checks the status of the local transaction based on the message, and then performs Commit or Rollback. The Broker checks back transaction messages by comparing Half messages with Op messages and pushes CheckPoint (which records that the status of those transaction messages is determined).

Rocketmq does not continually check the transaction status of the message. By default, it checks back 15 times. If the transaction status is not known after 15 times of checking, Rocketmq rolls back the message by default.

RocketMQ transaction solution

Diagram of ordering process

  • Order code to order services, integral services for example
  • First, let’s look at the specific business scenario: after users purchase goods, they need to generate corresponding orders and increase corresponding member points

The process to comb

  • 1. Send a preliminary message before placing an order
  • 2. After sending the preliminary message successfully, execute the local order transaction
  • 3. Send a confirmation message after placing a local order successfully
  • 4. The message side (integral business) can see the confirmation message, consume the message, and increase the integral

Message exception

  • The following process does not proceed if a preliminary message fails to be sent. This is normal
  • Exception two: Sending the prepared message succeeds, but executing the local transaction fails; This is also normal, the prepared message is not subscribed by the consumer, the consumer does not perform business =
  • Exception 3: If sending the preparatory message succeeds, executing the local transaction succeeds, but sending the confirmation message fails, this is the problem. Example: The user placed the order successfully, but the corresponding points of the user did not increase. There are inconsistencies in the data

RocketMq back to check

  • RocketMq uses state lookup to resolve the case of exception three, which means That RocketMq periodically iterates through a commitlog of prepared messages.

If the local services are not successfully executed, Rollback. If the local services are successfully executed, send a commit message

  • In the case of exception 3 above, the order was created locally when the preliminary message was successfully sent, but the confirmation message failed to be sent. Since RocketMq backchecks the pre-message, when it finds that the local order has been created successfully, it sends a COMMIT confirmation message and subsequent points systems can subscribe to the message. Also, in the case of exception 2, if the local order transaction is not successfully executed, the RollBack confirmation message will be triggered to delete the message.

SpringBoot integration with RocketMQ

Order service

Transaction log table

CREATE TABLE `transaction_log` (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'transaction ID',
  `business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Business Identity',
  `foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Corresponding primary key in business table'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Copy the code
  • Transaction_log is mainly used to look back on things. When business data is committed, a data entry is inserted into this table as well. Is in the same local transaction. Query the table by transaction ID. If a record is returned, the local transaction has been committed. If no record is returned, the local transaction may be in an unknown or rollback state.

Transaction sending instance

Basically creating the sender of the transaction message. Here, we focus on the OrderTransactionListener, which is responsible for executing the local transaction and transaction status lookup.

package com.java.xval.val.mq;

import com.java.xval.val.service.listenerTransaction.OrderTransactionListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * order transaction listener. */
@Component
public class OrderTransactionProducer extends TransactionProducer {

    // Listeners used to perform local transactions and transaction state lookbacks need custom transaction listeners for transaction secondary validation and transaction lookbacks
    @Resource
    private OrderTransactionListener orderTransactionListener;

    // The official recommendation for custom threads is that threads be taken from the defined name to find problems better
    private final ExecutorService executorService = new ThreadPoolExecutor(2.5.100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), r -> {
        Thread thread = new Thread(r);
        thread.setName("client-transaction-producer-check-thread");
        return thread;
    });

    // Initialize the order transaction listener when the Spring container starts.
    @PostConstruct
    public void buildInit(a) { init(orderTransactionListener, executorService); }}Copy the code
package com.java.xval.val.mq;

import com.java.xval.val.common.config.RocketMqDataConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;

@Component
public class TransactionProducer {

    private TransactionMQProducer transactionMQProducer;

    @Resource
    private RocketMqDataConfig rocketMqDataConfig;

    /** * start listener **@paramTransactionListener transactionListener *@paramExecutorService custom thread pools < defined for different scenarios > */
    public void init(TransactionListener transactionListener, ExecutorService executorService) {
        transactionMQProducer = new TransactionMQProducer(rocketMqDataConfig.getOrderTopic());
        transactionMQProducer.setNamesrvAddr(rocketMqDataConfig.getNameServer());
        transactionMQProducer.setSendMsgTimeout(Integer.MAX_VALUE);
        transactionMQProducer.setExecutorService(executorService);
        transactionMQProducer.setTransactionListener(transactionListener);
        this.start();
    }

    /** * the start object must be called once before it can be used, and can only be initialized once
    private void start(a) {
        try {
            this.transactionMQProducer.start();
        } catch(MQClientException e) { e.printStackTrace(); }}/** * Transaction message sent **@paramData Message sending object. *@paramTopic Indicates the topic of the message queue. *@return the TransactionSendResult
     * @throwsMQClientException throws the corresponding exception. */
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic, data.getBytes());
        return this.transactionMQProducer.sendMessageInTransaction(message, null); }}Copy the code

Custom transaction listeners

package com.java.xval.val.service.listenerTransaction;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.java.xval.val.model.Order;
import com.java.xval.val.service.OrderService;
import com.java.xval.val.service.TransactionLogService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/** * Order distributed transaction RocketMQ producer */
@Component
public class OrderTransactionListener implements TransactionListener {

    @Resource
    private OrderService orderService;

    @Resource
    private TransactionLogService transactionLogService;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {

        // There are three possibilities for local transaction execution
        // 1. Commit succeeded
        Rollback failed
        // 3. The service is down due to network reasons
        // Execute the local transaction that creates the order, where the order data and transaction log are inserted.
        logger.info("OrderTransactionListener starts executing local transaction message={}....", JSON.toJSONString(message));
        LocalTransactionState state;
        try {
            String body = new String(message.getBody());
            Order order = JSONObject.parseObject(body, Order.class);
            orderService.create(order, message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            logger.info("OrderTransactionListener Local transaction committed. {}", message.getTransactionId());
        } catch (Exception e) {
            logger.info("OrderTransactionListener failed to execute local transaction", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    / * * * only the interface returns LocalTransactionState. UNKNOW will call interface is called * *@param messageExt the messageExt
     * @returnLocalTransactionState Transaction status. *@see org.apache.rocketmq.client.producer.LocalTransactionState
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

        // Because there is a situation: Above local transaction execution is successful, but the return LocalTransactionState.COM MIT_MESSAG service will hang up and then eventually Brock secondary sure have not received the message, or a message in advance, so when restart or callback the callback interface.
        // If you execute a local transaction without first querying the execution status of the above local transaction, then it is equivalent to executing the local transaction successfully twice.
        logger.info("OrderTransactionListener starts checking the local transaction status {}", messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();

        if (StringUtils.isNotBlank(transactionLogService.get(transactionId))) {
            state = LocalTransactionState.COMMIT_MESSAGE;
        } else {
            state = LocalTransactionState.UNKNOW;
        }
        logger.info("OrderTransactionListener ends local transaction status query: {}", state);
        returnstate; }}Copy the code
  • Through transactionMQProducer. SendMessageInTransaction Message is sent after the success, will be called executeLocalTransaction (Message Message, the Object o) method, perform local affairs, Order data and transaction logs are inserted here.

LocalTransactionState enumeration resolution of the LocalTransactionState

  • COMMIT_MESSAGE: Commit transaction messages, which consumers can see
  • ROLLBACK_MESSAGE: Rolls back the transaction message, which is not seen by the consumer
  • 3. UNKNOW: The transaction status is unknown. You need to call the transaction status check to determine whether the message is committed or rolled back

The checkLocalTransaction(MessageExt MessageExt) method is used to query the status of the transaction. In the above example, transaction_log is used to query the ID of the transaction. If the result is found, the transaction message is submitted. If no query is found, the object is returned as unknown.

Business order implementation class

package com.java.xval.val.service.impl;

import com.alibaba.fastjson.JSON;
import com.java.xval.val.mapper.OrderMapper;
import com.java.xval.val.mapper.TransactionLogMapper;
import com.java.xval.val.model.Order;
import com.java.xval.val.model.TransactionLog;
import com.java.xval.val.mq.MqConstant;
import com.java.xval.val.mq.OrderTransactionProducer;
import com.java.xval.val.service.OrderService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/** ** order business implementation class */
@Service
public class OrderServiceImpl implements OrderService {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderServiceImpl.class);

    @Resource
    private OrderMapper orderMapper;

    @Resource
    private TransactionLogMapper transactionLogMapper;

    @Resource
    private OrderTransactionProducer orderTransactionProducer;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void create(Order order, String transactionId) {

        LOGGER.info("OrderServiceImpl starts ordering ={},transactionId={}", JSON.toJSONString(order), transactionId);

        // 1. This application creates an order
        orderMapper.create(order);

        // 2. Write the transaction log
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness(MqConstant.Top.USER_ORDER_TOPIC);
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);
        logger.info("OrderServiceImpl order creation completed ={}", order);
    }

    @Override
    public void createOrder(Order order) throws MQClientException { TransactionSendResult transactionSendResult = orderTransactionProducer.send(JSON.toJSONString(order), MqConstant.Top.USER_ORDER_TOPIC); transactionSendResult.getSendStatus(); }}Copy the code

Points system

The points correspond to the order’s consumer listening start

package com.java.xval.val.mq;

import com.java.xval.val.common.config.RocketMqDataConfig;
import com.java.xval.val.service.listenerTransaction.PointTransactionListener;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/** * order consumer monitor */
@Component
public class PointProductConsumer {

    @Resource
    private RocketMqDataConfig rocketMqDataConfig;

    @Resource
    private PointTransactionListener orderListener;

    @PostConstruct
    public void init(a) throws MQClientException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(MqConstant.ConsumeGroup.USER_ORDER_GROUP);
        defaultMQPushConsumer.setNamesrvAddr(rocketMqDataConfig.getNameServer());
        defaultMQPushConsumer.subscribe(MqConstant.Top.USER_ORDER_TOPIC, "*"); defaultMQPushConsumer.registerMessageListener(orderListener); defaultMQPushConsumer.start(); }}Copy the code
  • You need to specify a topic and listener for consumption.

Points consumer listener

package com.java.xval.val.service.listenerTransaction;

import com.alibaba.fastjson.JSONObject;
import com.java.xval.val.model.Order;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class PointTransactionListener implements MessageListenerConcurrently {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        logger.info("The consumer thread is listening to the message.");
        try {
            for (MessageExt message : list) {
                logger.info("Start processing order data and prepare to increase credits....");
                Order order = JSONObject.parseObject(message.getBody(), Order.class);
                if(! processor(message)) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                // Todo starts to insert the corresponding integral data.
                logger.info("Start inserting integral data, increase integral....");
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            logger.error("Abnormal processing of consumer data", e);
            returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}/** * Message processing, after the third processing failure, send an email or SMS notification manual intervention **@param message the message
     * @return boolean
     */
    private boolean processor(MessageExt message) {
        String body = new String(message.getBody());
        try {
            logger.info("PointTransactionListener message processing.... {}", body);
            int k = 1 / 0;
            return true;
        } catch (Exception e) {
            if (message.getReconsumeTimes() >= 3) {
                logger.error("The PointTransactionListener message has reached the maximum number of retries, and the business person will be notified to troubleshoot the problem. {}", message.getMsgId());
                // Todo sends SMS or email notifications.
                return true;
            }
            return false; }}}Copy the code

idempotence

  • 1. Check whether the order has been executed before execution
  • 2. Add an extra table for recording
  • 3, put in redis cache, query the cache before entering the library

Abnormal consumption

After consumer processing fails, RECONSUME_LATER is returned and the message is retried, up to 16 times by default

  • You can set this number on the consumer side.
// Sets the maximum number of message retries
consumer.setMaxReconsumeTimes(3);
Copy the code

Check out the RocketMQ console

Seven, the key

RocketMQ handles transaction limitations

  • 1. Rocketmq is concerned with the ultimate consistency of data. After the upstream service commits, the downstream service can only succeed and cannot roll back the upstream data.
  • 2. Create an order ➕ and deduct the inventory. For example, the producer end creates an order and sends a message to the inventory service. RocketMQ does not support data TCC rollback at this time. Consider using Ali’s Seata for this