What are distributed transactions

Speaking of transactions, I believe you have been exposed to MySQL transactions, but MySQL transactions can not solve the problem of distributed scenario transactions. The diagram below. At this point, we need to use distributed transactions to solve the transaction problem in distributed scenarios.

According to the figure above, if we are to implement a distributed transaction ourselves, how to achieve?

  1. Distributed transactions are performed through compensation
  2. Control through global transactions
  3. Do reliable events based on message queues
  4. .

There are many ways to achieve this, but the specific implementation depends on the company’s business scenario. Here are two strong consistency solutions

Strong consistency solution

XA distributed transaction

The earliest Distributed Transaction model is the X/Open Distributed Transaction Processing (DTP) model proposed by the X/Open International Consortium, also known as X/Open XA protocol, or XA protocol for short. The most typical XA implementation is the two-phase commit protocol (2PC).

Two-phase commit protocol

As the name implies, a complete set of processes is divided into two phases.

2PC Execution process

Phase 1:

  1. The coordinator asks each participant if the transaction can be performed normally and waits for each participant to respond
  2. Each participant performs a local transaction (writing a local undo/redo log), but does not commit a transaction.
  3. Each participant feedback the transaction query response to the coordinator

Stage 2:

All participants feedback Yes (commit transaction)

  1. The coordinator issues commit requests to all participants
  2. After the participant receives the COMMIT request, the transaction commit is formally performed and released upon completion
  3. After the participant sends the ACK message, the coordinator receives and completes the transaction.

One or more participants report No (interrupted transaction)

  1. The coordinator issues a Rollback request to all participants
  2. After receiving the Rollback request, participants use the Undo logs they recorded in phase 1 to Rollback the transaction and release the resources occupied during the entire transaction execution after the Rollback.
  3. The participant sends an ACK message to the coordinator after completing the transaction rollback.
  4. After receiving ack information from all participants, the coordinator completes the transaction interruption

Phase 1 and Phase 2 of 2PC all participant coordinator notifications are executed sequentially

As mentioned above, you can also see the pros and cons of 2PC

Advantages: Simple principle

Disadvantages:

  1. Synchronous blocking: During phases 1 and 2, all participants’ transactions are blocked.
  2. Single point of failure: The coordinator is a single point, and if the coordinator has a problem, the entire process is locked and cannot be executed, and there is no timeout mechanism for participants and coordinators
  3. Inconsistent data: After the coordinator sends a Commit request to participants, if network jitter occurs and some participants receive the Commit request and others do not, data inconsistency occurs.

Because of these shortcomings of 2PC, the three-phase commit protocol was developed to compensate for some of the shortcomings of the two-phase commit protocol.

Three-phase commit protocol

The three-phase commit protocol (3PC) is an improved version of 2PC. The “commit transaction request” process of 2PC is divided into two parts, and it is transformed into a three-phase transaction processing protocol consisting of CanCommit, PreCommit and doCommit. And a timeout mechanism was introduced.

3PC Execution process

Phase I (canCommit Phase) :

  1. The coordinator sends the CanCommit request serially to all participants. Asks if a transaction commit operation can be performed. It then waits for the participant’s response.
  2. After receiving a CanCommit request, the participant normally returns a Yes response and goes into a preparatory state if it thinks it can successfully execute the transaction, or No if it does not.

Phase 2 (preCommit Phase) :

All participants responded with Yes

  1. The coordinator sends the PreCommit request to the participant and enters the Prepared phase.
  2. Participants receive PreCommit requests, perform transactions, and log undo and redo information to the transaction log.
  3. If the participant successfully executes the transaction, an ACK response is returned while waiting for the final command

Some participants report No or wait time out

  1. The coordinator sends abort requests to all participants
  2. After an abort request from the coordinator is received (or after a timeout, no request from the coordinator is received), the participant performs the interruption of the transaction

Phase 3 (doCommit Phase) :

Execute successfully

  1. The coordinator receives an ACK response from the participant, changes from a pre-commit state to a commit state, and sends a doCommit request to all participants.
  2. After receiving the doCommit request, the participant performs a formal transaction commit. Transaction resources are released after completion.
  3. Once the transaction resource is freed, send an ACK response to the coordinator
  4. The coordinator completes the transaction after receiving ack responses from all participants.

Interrupt the transaction

  1. The coordinator sends abort requests to all participants
  2. After the abort request is received, the participant uses the Undo log to roll back the transaction and release all the transaction resources after the rollback
  3. After the participant completes the transaction rollback, it sends an ACK message to the coordinator
  4. After the coordinator receives all ACK information, the interrupt of the transaction is performed

Three-phase commit solves the problem of two-phase commit synchronization blocking and no timeout mechanism. However, there are still a number of issues that remain unresolved with the three-phase commit:

  1. A single point of failure
  2. Network failures between the coordinator and the participant eventually lead to data inconsistencies.

However, 2PC and 3PC are not suitable for Internet projects in the case of high concurrency

TCC distributed transaction solution

TCC (try-confirm-Cancel), also known as two-stage compensation transaction, is widely used in Ant Financial.

  • Phase 1 – Try: Checks reserved resources
  • Stage 2 – Confirm: The actual business action is submitted
  • Phase 3 – Cancel: Releases reserved resources

All three business logic needs to be implemented in business logic.

This is an ordering process

The second step of inventory reduction failed, but the order status, the third step and the fourth step were all successful, which is obviously not logical, so we need to deal with it in the business and modify the business logic and data table. We need to add a new frozen inventory number to the goods list, a pre-added points number to the points list, and so on

After placing the order successfully, modify the inventory to 98, but change the deducted inventory to 2 on frozen inventory, and do the same for other items. Do the local transaction commit after completion. That’s what the Try phase does. If all are successful, proceed to the Confirm stage

Confirm stage: This is the stage to do the formal operation, which is to clear frozen inventory and advance credits. Pre-added points cleared, added to the membership points, it becomes 1200. If this phase fails, you need to keep retrying.

However, if a transaction is rolled back during the Try phase, the Cancel phase is reached.

Cancel phase: This is when the TCC transaction framework senses it. All services will then be told to roll back, that is, to fill frozen inventory back into inventory, and so on. During the Cancel phase, if the transaction is not successful, it will continue to retry.

Some optimizations of TCC are presented in a separate chapter.

Self-developed distributed transactions: Local transaction table + message ultimate consistency solution

The scenarios with strong consistency, such as 2PC, are actually very narrow. In more scenarios, it is only necessary to ensure the consistency of data. After a short delay, the data is finally consistent. For example, in an order process, after the order is successful, we need to empty the shopping cart, and increase the experience of points, etc. In fact, it is OK to delay a few seconds. We just need to make sure that the order is successful and the shopping cart is empty or the coupons are handed out, or the bonuses are given, or the experience is not.

So the idea here is: after the order is placed, the order transaction updates the order status, but during the execution of the transaction, we record a message locally. The message is a log, such as a log of emptying the shopping cart or adding points. Because the log is recorded locally, either in a database or as a file, there is no distribution problem. In other words, this operation turns a distributed transaction into a local transaction, which ensures that the order transaction and the local message record are transaction consistent. Once you’ve done this, you can return the response directly to the client.

Now that the local transaction operation log has been logged, an asynchronous service is required to read the logged local message, and the service that calls the shopping cart empels the shopping cart and the service that credits increases the points. After the shopping cart empties and credits are complete, we go and modify the state of the local message to be done. If the operation fails during this request, it can be resolved by retry. Finally, you can ensure that the ordering system is consistent with other systems.

As described above, we implement it step by step.

We first need to have an entity class for the local message:

/** * Transaction message entities and databases are one-to-one correspondence fields */
public class MsgInfo {
    /** * primary key */
    private Long id;
    
    /** * transaction message */
    private String content;
    
    /** * the theme corresponds to RocketMQ */
    private String topic;
    
    /** * tags correspond to RocketMQ */
    private String tag;
    
    /** * state: 1- wait, 2- send */
    private int status;
    
    /** * create time */
    private Date createTime;
    
    /** * Delay time (unit: s) * the latest few seconds to send in MQ */
    private int delay;
    
 }
Copy the code

Once the entity message class is created, you need a class to manipulate the entity

/** * the transaction entity is actually a Queue */
public class Msg {
    /** * primary key and MsgInfo are one-to-one */
    private Long id;
    
    /** * db-url key, map to data source map */
    private String url;
    
    /** * Number of times processed */
    private int haveDealedTimes;
    
    /** * create time */
    private long createTime;
    
    /** * Next timeout */
    private long nextExpireTime;
    
 }
Copy the code

After the entity class is created, let’s think about it. Following the steps we described earlier, we need to put the order into MQ, but instead of sending it directly to MQ, we need to send the order to a Queue, which stores the Msg entity class. So what we need here is a post worker thread that pops up data from the Queue and compares it to the message table in the database to see if there are any local transaction messages in the database. If not, the message ends. If so, we need to create an MQ message. Before creating the message, we need to set a maximum number of retries to ensure accuracy (it may be that the transaction has not been committed to read), there is a chance for retries, and if not, we need to create an MQ message for delivery. If it works, it’s over.

Create MQ if the delivery fails, the message is put into a retry time wheel queue, so here we also need to have a thread for the time wheel queue to read. After the thread is fetched from the time wheel queue, it checks whether it timed out, according to the delay property of MsgInfo, if it timed out, it ends. If there is no timeout, join the transaction operation queue.

If at this point, the service hangs and everything in the queue is lost, what do we do? So at this point we need another thread. After we restart, the thread will check to see if it holds the lock. If it holds the lock, it will get the last 10 minutes waiting thread from DB based on MsgInfo’s status. Then it will create the MQ message and deliver it.

Here, in order to quickly query the lock information, we also need a lock thread for strong lock, strong enough to add a lock identifier, if not strong enough, it means that it is not the service to solve the problem of leakage. Also, a cleanup thread is required to periodically clean up submitted tasks.

Let’s look at the initialization code:

/** * @function description Init init initialization, config is OK *@paramConfig Configures the object */
public void init(Config config) {
    if (state.get().equals(State.RUNNING)) {
        LOGGER.info("Msg Processor have inited return");
        return;
    }
    LOGGER.info("MsgProcessor init start");
    state.compareAndSet(State.CREATE, State.RUNNING);
    // set the environment
    this.serverType = ConfigUtil.getServerType();
    if(config.getEtcdHosts() ! =null && config.getEtcdHosts().length >= 1) {
        envNeedLock = true;
        defaultEtcd = config.getEtcdHosts();
        LOGGER.info("serverType {} envNeedLock {} etcdhosts {}", serverType, envNeedLock, defaultEtcd);
    }
    // 2
    this.config = config;
    // set the number of threads for processing transaction messages
    exeService = Executors.newFixedThreadPool(config.getThreadNum(), new ThreadFactory("MsgProcessorThread-"));
    for (int i = 0; i < config.getThreadNum(); i++) {
        exeService.submit(new MsgDeliverTask());
    }
    // set other threads
    scheService = Executors.newScheduledThreadPool(config.getSchedThreadNum(), new ThreadFactory("MsgScheduledThread-"));
    // Set time rotation thread: Time wheel retries failed transaction operations
    scheService.scheduleAtFixedRate(new TimeWheelTask(), TIME_WHEEL_PERIOD, TIME_WHEEL_PERIOD, TimeUnit.MILLISECONDS);
    // Set transaction message deletion thread
    scheService.scheduleAtFixedRate(new CleanMsgTask(), config.deleteTimePeriod, config.deleteTimePeriod, TimeUnit.SECONDS);
    // Set the thread to prevent threads from being missed in the last 10 minutes
    scheService.scheduleAtFixedRate(new ScanMsgTask(), config.schedScanTimePeriod, config.schedScanTimePeriod, TimeUnit.SECONDS);
    // Set the heartbeat thread to report the backlog of the transaction commit queue
    scheService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            LOGGER.info("stats info msgQueue size {} timeWheelQueue size {}", msgQueue.size(), timeWheel.size()); }},20, config.getStatsTimePeriod(), TimeUnit.SECONDS);
    // initialize the lock client
    initLock();
    LOGGER.info("MsgProcessor init end");
}
Copy the code

Transaction delivery thread:

// transaction thread
class MsgDeliverTask implements Runnable {
    @Override
    public void run(a) {
        while (true) {
            if(! state.get().equals(State.RUNNING)) {break;
            }
            try {
                // 1. A transaction operation message is sent from the queue every 100ms
                Msg msg = null;
                try {
                    // Pull the message
                    msg = msgQueue.poll(DEF_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                } catch (InterruptedException ex) {
                }
                if (msg == null) {
                    continue;
                }
                LOGGER.debug("poll msg {}", msg);
                int dealedTime = msg.getHaveDealedTimes() + 1;
                msg.setHaveDealedTimes(dealedTime);
                // get the actual transaction message from db.
                MsgInfo msgInfo = msgStorage.getMsgById(msg);
                LOGGER.debug("getMsgInfo from DB {}", msgInfo);
                if (msgInfo == null) {
                    if (dealedTime < MAX_DEAL_TIME) {
                        // 3.1 join the time wheel rotation queue: retry delivery
                        long nextExpireTime = System.currentTimeMillis() + TIMEOUT_DATA[dealedTime];
                        msg.setNextExpireTime(nextExpireTime);
                        timeWheel.put(msg);
                        LOGGER.debug("put msg in timeWhellQueue {} ", msg); }}else {
                    // 3.2
                    Message mqMsg = buildMsg(msgInfo);
                    LOGGER.debug("will sendMsg {}", mqMsg);
                    SendResult result = producer.send(mqMsg);
                    LOGGER.info("msgId {} topic {} tag {} sendMsg result {}", msgInfo.getId(), mqMsg.getTopic(), mqMsg.getTags(), result);
                    if (null== result || result.getSendStatus() ! = SendStatus.SEND_OK) {// Failed delivery, reenter the time wheel
                        if (dealedTime < MAX_DEAL_TIME) {
                            long nextExpireTime = System.currentTimeMillis() + TIMEOUT_DATA[dealedTime];
                            msg.setNextExpireTime(nextExpireTime);
                            timeWheel.put(msg);
                            // This can be optimized because the transaction has been confirmed and can be retrieved from DB
                            LOGGER.debug("put msg in timeWhellQueue {} ", msg); }}else if (result.getSendStatus() == SendStatus.SEND_OK) {
                        // Change the state of the database to indicate that it has been submitted.
                        int res = msgStorage.updateSendMsg(msg);
                        LOGGER.debug("msgId {} updateMsgStatus success res {}", msgInfo.getId(), res); }}}catch (Throwable t) {
                LOGGER.error("MsgProcessor deal msg fail", t); }}}}Copy the code

Retry time wheel thread

class TimeWheelTask implements Runnable {
    @Override
    public void run(a) {
        try {
            if (state.get().equals(State.RUNNING)) {
                long cruTime = System.currentTimeMillis();
                // Check if there is Msg
                Msg msg = timeWheel.peek();
                // It may not have timed out when it is taken out
                while(msg ! =null && msg.getNextExpireTime() <= cruTime) {
                    msg = timeWheel.poll();
                    LOGGER.debug("timeWheel poll msg ,return to msgQueue {}", msg);
                    // Put it back inmsgQueue.put(msg); msg = timeWheel.peek(); }}}catch (Exception ex) {
            LOGGER.error("pool timequeue error", ex); }}}Copy the code

Delete the local message thread

class CleanMsgTask implements Runnable {
    @Override
    public void run(a) {
        if (state.get().equals(State.RUNNING)) {
            LOGGER.debug("DeleteMsg start run");
            try {
                Iterator<DataSource> it = msgStorage.getDataSourcesMap().values().iterator();
                while (it.hasNext()) {
                    DataSource dataSrc = it.next();
                    if (holdLock) {
                        LOGGER.info("DeleteMsgRunnable run ");
                        int count = 0;
                        int num = config.deleteMsgOneTimeNum;
                        // If the number of affected rows is not equal to or greater than the maximum number of deleted rows, the task ends
                        while (num == config.deleteMsgOneTimeNum && count < MAX_DEAL_NUM_ONE_TIME) {
                            try {
                                num = msgStorage.deleteSendedMsg(dataSrc, config.deleteMsgOneTimeNum);
                                count += num;
                            } catch (SQLException e) {
                                LOGGER.error("deleteSendedMsg fail ", e);
                            }
                        }
                    }
                }
            } catch (Exception ex) {
                LOGGER.error("delete Run error ", ex); }}}}Copy the code

Bare threads

class ScanMsgTask implements Runnable {
    @Override
    public void run(a) {
        if (state.get().equals(State.RUNNING)) {
            LOGGER.debug("SchedScanMsg start run");
            Iterator<DataSource> it = msgStorage.getDataSourcesMap().values().iterator();
            while (it.hasNext()) {
                DataSource dataSrc = it.next();
                boolean canExe = holdLock;
                if (canExe) {
                    LOGGER.info("SchedScanMsgRunnable run");
                    int num = LIMIT_NUM;
                    int count = 0;
                    while (num == LIMIT_NUM && count < MAX_DEAL_NUM_ONE_TIME) {
                        try {
                            List<MsgInfo> list = msgStorage.getWaitingMsg(dataSrc, LIMIT_NUM);
                            num = list.size();
                            if (num > 0) {
                                LOGGER.debug("scan db get msg size {} ", num);
                            }
                            count += num;
                            for (MsgInfo msgInfo : list) {
                                try {
                                    Message mqMsg = buildMsg(msgInfo);
                                    SendResult result = producer.send(mqMsg);
                                    LOGGER.info("msgId {} topic {} tag {} sendMsg result {}", msgInfo.getId(), mqMsg.getTopic(), mqMsg.getTags(), result);
                                    if(result ! =null && result.getSendStatus() == SendStatus.SEND_OK) {
                                        // Change the database status
                                        int res = msgStorage.updateMsgStatus(dataSrc, msgInfo.getId());
                                        LOGGER.debug("msgId {} updateMsgStatus success res {}", msgInfo.getId(), res); }}catch (Exception e) {
                                    LOGGER.error("SchedScanMsg deal fail", e); }}}catch (SQLException e) {
                            LOGGER.error("getWaitMsg fail", e);
                        }
                    }
                }
            }
        }
    }
    
}
Copy the code

By looking at the architecture diagram above, combined with the third method of inventory reduction in the case of high concurrency I talked about earlier, there are points that can be optimized. So you can go straight to creating MQ message delivery. Later time round queues can also be created and sent directly to the post thread to send MQ messages directly.