This paper compares two-phase transaction, maximum effort delivery and message final consistency, and gives partial solutions. The final consistency solution refers to Ali RockMQ transaction message:blog.csdn.net/chunlong…)

A two-phase transaction

There are N schemes for the final consistency of distributed system, such as 2PC (two-stage transaction) and three-stage submission, etc., but the cost is high and the implementation is complex. For example, a two-stage transaction needs to introduce a Coordinator to control the operation results of all participants

Take a meeting as an example: a, B, B, D are going to organize a meeting and need to determine the meeting time. Let’s say A is the coordinator and B is the participant. Voting stage: (1) A sends an email to B Bingding to ask whether there is time for the meeting at 10:00 on Tuesday; (2) Party A has time to reply; (3) Party B replies that there is time; (4) C does not reply for a long time. At this time, both A, B and C are blocked for this activity, and the algorithm cannot continue; (5) C replies that there is time (or no time); Submission stage: (1) Coordinator A will feedback the collected results to B b D (when to feedback and what the feedback result is, in this case, depends on the time and decision of C); (2) Received by Party B; (3) RECEIVED by C; (4) D received; Locking not only all the resources of the participant, but also the coordinator resources is expensive. In a word: 2PC is inefficient and distributed transactions are difficult to do.

In cases where factual requirements are not so high, ultimate consistency can be addressed with best effort-based delivery && message queues and message stores.

Two messages for maximum delivery

The so-called maximum effort delivery, is that I try my best to do anyway, can succeed, do not make a complete guarantee will involve three modules

  1. Upstream applications that send messages to MQ queues.

  2. Downstream applications, such as SMS and email services, accept requests and return notification results.

  3. Notify the service as best as possible, listen to the message queue, store the message in the database, and invoke the downstream application’s send notification interface according to notification rules.

The specific process is as follows

  1. Upstream applications send MQ messages to MQ components containing notification rules and notification addresses

  2. The best effort notification service listens for messages within MQ, parses the notification rule, and places it in a delayed queue waiting for the notification to be triggered

  3. If the call is successful, the message will be marked as notification success. If the call fails, the message will be put back into the delay queue for the next triggering under the condition that the notification rules are met (for example, once every 5 minutes, 10 times in total).

Best effort notification service means ensuring data consistency as much as possible without affecting the main business. It requires the developer to specify notification rules according to the business, and ensure data consistency as far as possible under the premise of meeting notification rules to achieve the purpose of maximum effort.

The implementation is also relatively simple. At present, the mainstream message queues have ACK mechanism. When no ACK is received, the rules can be used to do the periodic resend. Advantages: Simple implementation Disadvantages: No compensation mechanism, no guarantee of delivery Key points: Ensure that the message can be rolled back with services after the message fails to be sent. The receiving party guarantees the stylishness; Timing retransmission mechanism, using a certain retransmission strategy, such as exponential growth, it is said that Ali uses Redis zset to complete, see zhuanlan.zhihu.com/p/… When a message is sent to zset, DelayQ is triggered by a timer (say, at the second level) and the corresponding consuming thread is forked to process messages whose ExecuteTime in zset is greater than the current time. DelayQ gets a message, parses the callbackURL in it, assembs parameters, and pushes the business message to the Consumer. The Consumer returns a successful processing, then the message in Zrem Codis. If processing fails, the next time it tries is calculated and its ExecuteTime is updated.

Reliable message final consistency scheme

This scheme involves three modules:

  1. Upstream applications that perform business and send MQ messages.

  2. Reliable messaging services and MQ messaging components that coordinate the delivery of upstream and downstream messages and ensure consistency of upstream and downstream data.

  3. Downstream applications that listen for MESSAGES from MQ and perform their own business.

Phase 1: The upstream application executes the business and sends MQ messages

Upstream applications bind local business execution and message sending in the same local transaction, ensuring that either the local operation succeeds and the MQ message is sent, or both steps fail and rollback is performed.

The business interaction diagram between upstream applications and trusted messages is as follows:

  1. Upstream applications send unacknowledged messages to the reliable messaging system

  2. The reliable messaging system saves the message to be confirmed and returns it

  3. Upstream applications perform local services

  4. The upstream application notifies the reliable messaging system that the business has been executed and sends the message.

  5. The reliable messaging system modifies the message state to the send state and delivers the message to the MQ middleware.

Each of the above steps may fail. Analyze whether the upstream business and message sending are consistent after exceptions occur in these five steps:

When the upstream application is executed but the downstream application is not executed or fails to execute, the transaction is in the Soft State of BASE theory.

Phase two: Downstream applications listen for MQ messages and perform services

Downstream applications listen to MQ messages and perform business, and notify reliable messaging services of the results of message consumption.

The status of the reliable message must be consistent with that of the downstream application. If the status of the reliable message is not completed, ensure that the downstream application is not executed. If the status of the reliable message is completed, ensure that the downstream application is executed.

The interaction diagram between downstream applications and reliable messaging services is as follows:

  1. Downstream applications listen to MQ message components and retrieve messages

  2. Downstream applications process local services based on the MQ message body information

  3. The downstream application automatically sends an ACK message to the MQ component confirming that it has been consumed

  4. The downstream application notifies the reliable message system that the message was successfully consumed, and the reliable message changes the status of the message to completed.

Each of the above steps may fail. Analyze whether the downstream business and message states are consistent after the exceptions occur in these four steps:

By analyzing the possible failures of the above two phases, in order to ensure the final consistency of upstream and downstream data, two functions of message status confirmation and message retransmission should be developed in a reliable message system to achieve the Eventually Consistent feature of BASE theory.

Exception Handling 1: Confirm the message status

The reliable message service periodically listens for message status. If there is a message whose status is pending and timed out, it indicates that step 4 or 5 in the interaction between the upstream application and the reliable message is abnormal.

A reliable message sends a request to the upstream application to check whether the service is running. Upstream applications provide a queriable interface for reliable messages to trace the state of business execution, change the message status to sent if the business execution is successful, and delete the message otherwise to ensure data consistency. The specific process is as follows:

  1. Reliable message Indicates the message whose status to be confirmed has timed out

  2. Query the service execution status of upstream applications

  3. If no service is running, the message is deleted to ensure the consistency between the service and the reliable message service. If the service is running, the message status is changed to sent and the message is sent to the MQ component.

Exception Handling 2: The message is resended

If the message is sent, it indicates that the upstream application has executed, and then it ensures that the downstream application can execute as well.

If the reliable messaging service detects a message in the reliable messaging service whose message status is sent and has timed out, it indicates that there is an abnormal step in the reliable messaging service and the downstream application. In either case, the reliable messaging service reposts the message to the MQ component for the downstream application to listen for.

After the downstream application listens to the message, it re-executes the business and notifies the reliable message service of the successful consumption of the message under the condition of ensuring idempotency, finally ensuring the data consistency between the upstream application and the downstream application. The specific process is as follows:

  1. The reliable message service periodically queries messages whose status is sent and has timed out

  2. Reliable messages repost messages to MQ components

  3. Downstream applications listen for messages and re-execute business under idempotent conditions.

  4. The downstream application notifies the reliable messaging service that the message has been successfully consumed.

The two functions of message status confirmation and message retransmission ensure the final consistency of data between upstream applications, reliable message services and downstream applications.

Realization of final consistency of reliable message

The modules and divisions are as follows

A message service module

1.1 Prestorage interface

Before sending the message, store the message information in the database. The state of the message is “to be confirmed”, and the corresponding message queue is saved

public int saveMessageWaitingConfirm(RpTransactionMessage message) {
    message.setEditTime(new Date());
    message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
    message.setAreadlyDead(PublicEnum.NO.name());
    message.setMessageSendTimes(0);
    return rpTransactionMessageDao.insert(message);
}
Copy the code

1.2 Interface for confirming and sending messages

When the service initiator receives the message “After the pre-stored message is saved successfully”, the service can process the message. After the message is processed, the service initiator sends the message to the message center. In this case, the interface for confirming and sending the message is set to Sending, and the message is sent to the message queue

Public void confirmAndSendMessage(String messageId) {// Updates the message status in the database final RpTransactionMessage = getMessageByMessageId(messageId) message.setStatus(MessageStatusEnum.SENDING.name()); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); / / send a message notifyJmsTemplate. SetDefaultDestinationName (message. GetConsumerQueue ()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return  session.createTextMessage(message.getMessageBody()); }}); }Copy the code

1.3 Save and Send

If there are no business operations in the first phase, you can set the message status to Sending and send messages directly to the message queue.

1.4 Direct transmission and transparent transmission

Send messages directly without message reliability.

1.5 Message resending Interface

Resend the message according to the messageId, and of course, resend the message when other modules detect that there is a problem with sending the message

Public void reSendMessage(final RpTransactionMessage Message) {// Increase the number of resends message.addsendTimes (); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); / / send a message notifyJmsTemplate. SetDefaultDestinationName (message. GetConsumerQueue ()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return  session.createTextMessage(message.getMessageBody()); }}); }Copy the code

1.6 Mark a message as dead

1.7 Message Deletion

Confirm that the message has been consumed, or the service processing failed, there is no need to save the message, directly delete it

1.8 Resending all death messages

Two message exception processing module

2.1 Process the message to be acknowledged but timed out

If the message is in the Pending state but the service operation has been completed, the message is not successfully sent. You need to resend the message.

Use a thread pool

threadPool.execute(new Runnable() { @Override public void run() { while (true) { settScheduled.handleWaitingConfirmTimeOutMessages(); Try {log.info("[waiting_confirm] sleep 60 seconds "); Thread.sleep(60000); } catch (InterruptedException e) { } } } });Copy the code

Periodically queries the message whose status is to be confirmed but times out

paramMap.put("createTimeBefore", dateStr); / / how long did it take for the news of the paramMap) put (" status ", MessageStatusEnum. WAITING_CONFIRM. Name ()); // take the messageMap <String, RpTransactionMessage> messageMap = getMessageMap(numPerPage, maxHandlePageCount, paramMap);Copy the code

Resend these messages

/ / to query their business RpTradePaymentRecord record = rpTradePaymentQueryService. GetRecordByBankOrderNo (bankOrderNo); // If the order is successful, change the message to pending, And send a message if (TradeStatusEnum. SUCCESS. The name () equals (record) getStatus ())) {/ / confirmed and send messages rpTransactionMessageService.confirmAndSendMessage(message.getMessageId()); } else if (TradeStatusEnum.waiting_payment.name ().equals(record.getStatus())) { Log.debug (" [waiting_confirm] message ID [" + message.getMessageId() + "] message "); rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId()); }Copy the code

2.2 Processing sent Messages

If the message is always in the Sending state and is not deleted, it indicates that the message is not processed by the receiver. You need to improve this situation and query the message in the Sending state

paramMap.put("createTimeBefore", dateStr); / / how long did it take for the news of the paramMap) put (" status ", MessageStatusEnum. SENDING. The name ()); Put ("areadlyDead", PublicEnum. No.name ()); // Set the status of the message to parammap. put("areadlyDead", PublicEnum. Map<String, RpTransactionMessage> messageMap = getMessageMap(numPerPage, maxHandlePageCount, paramMap);Copy the code

To resend

/ / if more than the largest number of sent directly out of the if (maxTimes < message. GetMessageSendTimes ()) {/ / marked for death rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId()); continue; } / / determine if the time interval to send the message condition int reSendTimes = message. GetMessageSendTimes (); int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes); long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); long needTime = currentTimeInMillis - times * 60 * 1000; long hasTime = message.getEditTime().getTime(); If (hasTime > needTime) {log.debug("currentTime[" + sdf.format(new Date())) + "],[SENDING] Time when a message was last sent [" + sdf.format(message.geteditTime ()) + "]. [" + times + "] Minutes have passed before the message can be sent again." ); continue; } / / to resend the message rpTransactionMessageService reSendMessage (message);Copy the code

Message receiving module

The message is received and processed, and the message is deleted from the DB