Author: Throwable

Garden: blog https://www.cnblogs.com/throwable/p/12266806.html

The premise

Distributed transaction is a thorny problem in micro-service practice. In the micro-service practice scheme implemented by the author, compromise or avoid strong consistency scheme is adopted. Referring to the local message table scheme proposed by Ebay many years ago, lightweight encapsulation is made based on RabbitMQ and MySQL (JDBC) to achieve a low intrusion transaction message module. The content of this paper is a detailed analysis of the design and implementation of the whole scheme. The environment dependencies are as follows:

  • JDK1.8 +
  • spring-boot-start-web:2.x.x
  • spring-boot-start-jdbc:2.x.x
  • spring-boot-start-amqp:2.x.x
  • HikariCP:3.x.x (spring-boot-start-JDBC built-in)
  • Mysql connector – Java: 5.1.48
  • Redisson: 3.12.1

Scheme design Idea

In principle, transaction messages are only suitable for scenarios of weak consistency (or final consistency). Common scenarios of weak consistency are as follows:

  • The user service completes the registration action and pushes a marketing related message to the SMS service.
  • In the credit system, the order service saves the order and pushes a record of the order to be approved to the approval service.
  • .

Transaction messages should generally not be used for strong consistency scenarios.

In general, demanding consistency requires strict synchronization, meaning that all operations must succeed or fail at the same time, which introduces additional costs of synchronization. If a transaction message module is properly designed, compensation, query, monitoring, and so on are completed, and because the system interaction is asynchronous, overall throughput is higher than strict synchronization. In the business system in charge of the author, a basic principle is customized based on the use of transaction messages: on the premise that the message content is correct, consumers should take care of themselves if there is any abnormality.

To put it simply: the upstream ensures that its business is correct, and if it successfully pushes the correct message to RabbitMQ, the upstream duty is over.

To make your code less intrusive, transaction messages need to leverage Spring’s programmatic or declarative transactions. Programmatic transactions typically rely on TransactionTemplate, while declarative transactions rely on AOP modules and the @Transactional annotation.

Then you need to customize a transaction message function module, and add a transaction message record table (actually the local message table), which is used to hold the record of every message that needs to be sent. The main functions of the transaction message function module are:

  • Save message logs.
  • Push messages to the RabbitMQ server.
  • Query message records, compensation push, and so on.

The logical unit of transaction execution

Within the logical unit of transaction execution, the transaction message record to be pushed needs to be saved, that is, the local (business) logic and the transaction message record save operation are bound to the same transaction.

Sending a message to the RabbitMQ server should be delayed until after the transaction commits so that the transaction commits successfully and the message is sent to the RabbitMQ server successfully.

In order to send the save the momentum transaction message and send a message to the RabbitMQ two actions from the perspective of the user perception combined into an action, it need to use the Spring transaction synchronizer TransactionSynchronization, analyze the transaction here synchronizer of the main methods of correction, Main reference AbstractPlatformTransactionManager# commit () or AbstractPlatformTransactionManager# processCommit () method:

The figure above only illustrates the scenario where the transaction commits correctly (without exceptions). It’s clear here that, Transaction of synchronizer TransactionSynchronization afterCommit () and afterCompletion (int Status) method in real transaction commit point AbstractPlatformTransactionManager# doCommit () after the callback, so you can choose one of these two methods are used to perform notification message to the RabbitMQ server, the pseudo code is as follows:

@Transactional public Dto businessMethod(){ business transaction code block ... / / save transaction message [saveTransactionMessageRecord ()] / / registered affairs synchronizer - in afterCommit () method of push message to the RabbitMQ [register TransactionSynchronization,send message in method afterCommit()] business transaction code block ... }Copy the code

In the pseudocode above, the steps of saving the transaction message and registering the transaction synchronizer can be inserted anywhere in the transaction method, that is, regardless of the order of execution.

Compensation for transaction messages

Although the author suggested that the downstream service take care of the abnormal scenario of its own service consumption, sometimes the upstream service is forced to push the corresponding message again, which is a special scenario. Another scenario to consider: transaction commits trigger affairs synchronizer TransactionSynchronization afterCommit failure () method. This is a low probability of the scene, but in the production, will be in a more typical reason is: after the completion of the transaction commit has not yet had time to trigger TransactionSynchronization# afterCommit () method to push service instance is restarted.

As shown below:

In order to deal with the problem of compensation push uniformly, finite state is used to judge whether the message has been pushed successfully:

  • In the transaction method, when the transaction message is saved, mark the message record push status as processing.
  • Transaction synchronizer interface TransactionSynchronization afterCommit () method of implementation, the notification message corresponding to the RabbitMQ, then become more transaction message record status to push a success.

There is a very special case is the RabbitMQ server itself, abnormal failure message to push this case need to retry compensation (push), experience proves that in a short period of time repeatedly try again is meaningless, the fault transient recovery service does not generally, so consider using index retry retreat algorithm, You also need to limit the maximum number of retries.

The index value, interval value, and maximum number of retries must be set based on the actual situation. Otherwise, messages may be delayed too long or retries are too frequent.

Plan implementation

Introducing core dependencies:

< the properties > < spring. The boot. Version > 2.2.4. RELEASE < / spring. The boot. Version > < redisson. Version > 3.12.1 < / redisson version > The < mysql version > 5.1.48 < / mysql. Connector. Version > < / properties > < dependencyManagement > < dependencies > <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.connector.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency>  <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency>  <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>${redisson.version}</version> </dependency> </dependencies>Copy the code

Spring-boot-starter-jdbc, mysql-connector-Java, and spring-boot-starter-AOP are mysql transaction dependent, Spring-boot-starter-amqp is the encapsulation of the RabbitMQ client. Redisson mainly uses its distributed lock to compensate the locking execution of scheduled tasks (to prevent multiple service nodes from concurrently executing the compensation push).

Table design

The transaction message module mainly involves two tables. Taking MySQL as an example, the DDL of the table is as follows:

CREATE TABLE `t_transactional_message` ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, creator VARCHAR(20) NOT NULL DEFAULT 'admin', editor VARCHAR(20) NOT NULL DEFAULT 'admin', Deleted TINYINT NOT NULL DEFAULT 0, current_RETRY_times TINYINT NOT NULL DEFAULT 0 COMMENT 'Current retry times ', Max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT 'Maximum retry times ', queue_name VARCHAR(255) NOT NULL COMMENT' queue name ', Exchange_name VARCHAR(255) NOT NULL COMMENT 'exchange_type VARCHAR(8) NOT NULL COMMENT' exchange_type VARCHAR ', Routing_key VARCHAR(255) COMMENT 'route ', business_module VARCHAR(32) NOT NULL COMMENT' route ', Business_key VARCHAR(255) NOT NULL COMMENT ' ', next_schedule_time DATETIME NOT NULL COMMENT ' ', Message_status TINYINT NOT NULL DEFAULT 0 COMMENT 'Message status ', Init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT 'Backoff initialization value, in seconds ', Backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT 'backoff_factor ', INDEX idx_queue_name (queue_name), INDEX idx_create_time (create_time), INDEX idx_next_schedule_time (next_schedule_time), INDEX idx_business_key (business_key) COMMENT 'id '; CREATE TABLE `t_transactional_message_content` ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, Message_id BIGINT UNSIGNED NOT NULL COMMENT 'ID', content TEXT COMMENT' ID') COMMENT 'ID'; message_id BIGINT UNSIGNED NOT NULL COMMENT' ID', content TEXT COMMENT 'ID') COMMENT' ID';Copy the code

Because this module it would be possible to extend a background management module, so the news of the state of management and related fields and large volume of message content stored in two tables respectively, so as to avoid mass query news record MySQL service IO the high utilization rate (this is and discuss the DBA team in a company after get a more reasonable scheme). Two business fields, business_module and business_key, are reserved to identify the business module and the business key (usually a unique identifier, such as an order number).

In general, if the service is configured to declare the queue and exchange binding in advance, it will only rely on exchangeName and routingKey to send RabbitMQ messages (header exchanges are special and rarely used, so don’t consider this for now). To allow the service to omit the declaration operation, the message is first bound based on the queue and cached (the queue-exchange binding in RabbitMQ states that no exception will be thrown as long as the binding parameters are the same each time).

Scheme code design

The API design of the message transaction management background is ignored in the following schema design description, which can be supplemented later.

Define the model entity class TransactionalMessage and TransactionalMessageContent:

@Data
public class TransactionalMessage {

    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {

    private Long id;
    private Long messageId;
    private String content;
}Copy the code

Then define the DAO interface (the implementation details will not be expanded here, the storage uses MySQL, if you want to replace with another type of database, just use a different implementation) :

public interface TransactionalMessageDao {

    void insertSelective(TransactionalMessage record);

    void updateStatusSelective(TransactionalMessage record);

    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,
                                                               LocalDateTime maxScheduleTime,
                                                               int limit);
}

public interface TransactionalMessageContentDao {

    void insert(TransactionalMessageContent record);

    List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}Copy the code

Then define the transaction message service interface TransactionalMessageService:

/ / foreign services provided by the class interface public interface TransactionalMessageService {void sendTransactionalMessage (Destination Destination, TxMessage message); } @Getter @RequiredArgsConstructor public enum ExchangeType { FANOUT("fanout"), DIRECT("direct"), TOPIC("topic"), DEFAULT(""), ; private final String type; } public interface Destination {ExchangeType ExchangeType (); String queueName(); String exchangeName(); String routingKey(); } @Builder public class DefaultDestination implements Destination { private ExchangeType exchangeType; private String queueName; private String exchangeName; private String routingKey; @Override public ExchangeType exchangeType() { return exchangeType; } @Override public String queueName() { return queueName; } @Override public String exchangeName() { return exchangeName; } @Override public String routingKey() { return routingKey; Public interface TxMessage {String businessModule(); String businessKey(); String content(); } @Builder public class DefaultTxMessage implements TxMessage { private String businessModule; private String businessKey; private String content; @Override public String businessModule() { return businessModule; } @Override public String businessKey() { return businessKey; } @Override public String content() { return content; }} @requiredargsconstructor public enum TxMessageStatus {/** * successful */ SUCCESS(1), /** * PENDING */ PENDING(0), /** * FAIL */ FAIL(-1),; private final Integer status; }Copy the code

TransactionalMessageService implementation class is the core of transaction message function realization, the code is as follows:

@Slf4j @Service @RequiredArgsConstructor public class RabbitTransactionalMessageService implements TransactionalMessageService { private final AmqpAdmin amqpAdmin; private final TransactionalMessageManagementService managementService; private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>(); @Override public void sendTransactionalMessage(Destination destination, TxMessage message) { String queueName = destination.queueName(); String exchangeName = destination.exchangeName(); String routingKey = destination.routingKey(); ExchangeType exchangeType = destination.exchangeType(); QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {Queue Queue = new Queue(queueName); amqpAdmin.declareQueue(queue); Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType()); amqpAdmin.declareExchange(exchange); Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); amqpAdmin.declareBinding(binding); return true; }); TransactionalMessage record = new TransactionalMessage(); record.setQueueName(queueName); record.setExchangeName(exchangeName); record.setExchangeType(exchangeType.getType()); record.setRoutingKey(routingKey); record.setBusinessModule(message.businessModule()); record.setBusinessKey(message.businessKey()); String content = message.content(); / / save transaction message record managementService. SaveTransactionalMessageRecord (record, content); / / registered affairs synchronizer TransactionSynchronizationManager registerSynchronization (new TransactionSynchronizationAdapter () {@ Override  public void afterCommit() { managementService.sendMessageSync(record, content); }}); }}Copy the code

Persistent message record status and content of unified management in TransactionalMessageManagementService:

@Slf4j @RequiredArgsConstructor @Service public class TransactionalMessageManagementService { private final TransactionalMessageDao messageDao; private final TransactionalMessageContentDao contentDao; private final RabbitTemplate rabbitTemplate; private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0); private static final long DEFAULT_INIT_BACKOFF = 10L; private static final int DEFAULT_BACKOFF_FACTOR = 2; private static final int DEFAULT_MAX_RETRY_TIMES = 5; private static final int LIMIT = 100; public void saveTransactionalMessageRecord(TransactionalMessage record, String content) { record.setMessageStatus(TxMessageStatus.PENDING.getStatus()); record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR, 0)); record.setCurrentRetryTimes(0); record.setInitBackoff(DEFAULT_INIT_BACKOFF); record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR); record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES); messageDao.insertSelective(record); TransactionalMessageContent messageContent = new TransactionalMessageContent(); messageContent.setContent(content); messageContent.setMessageId(record.getId()); contentDao.insert(messageContent); } public void sendMessageSync(TransactionalMessage record, String content) { try { rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content); If (log.isdebugenabled ()) {log.debug(" Message sent successfully, destination queue :{}, message content :{}", Record.getQueuename (), content); } // markSuccess(record); } catch (Exception e) {// markFail(record, e); }} private void markSuccess (TransactionalMessage record) {/ / mark the next time a maximum execution time for the record. SetNextScheduleTime (END); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ? record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1); record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } private void markFail(TransactionalMessage Record, Exception e) {log.error(" failed to send message, target queue :{}", Record.getQueuename (), e); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ? record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1); / / calculate the execution time of the next LocalDateTime nextScheduleTime = calculateNextScheduleTime (record) getNextScheduleTime (), record.getInitBackoff(), record.getBackoffFactor(), record.getCurrentRetryTimes() ); record.setNextScheduleTime(nextScheduleTime); record.setMessageStatus(TxMessageStatus.FAIL.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } /** * Calculates the next execution time ** @param Base time * @param initBackoff base value * @param backoffFactor Retreat index * @param round Number of rounds * @return LocalDateTime */ private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) { double delta = initBackoff * Math.pow(backoffFactor, round); return base.plusSeconds((long) delta); * * *} / push the parameters of compensation - which should be based on the actual scene custom * / public void processPendingCompensationRecords () {/ / time right value minus the retreat from the initial value for the current time, LocalDateTime Max = localDatetime.now ().plusseconds (-default_init_backoff); LocalDateTime min = max.plusHours(-1); LocalDateTime min = max.plusHours(-1); Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT) .stream() .collect(Collectors.toMap(TransactionalMessage::getId, x -> x)); if (! collect.isEmpty()) { StringJoiner joiner = new StringJoiner(",", "(", ")"); collect.keySet().forEach(x -> joiner.add(x.toString())); contentDao.queryByMessageIds(joiner.toString()) .forEach(item -> { TransactionalMessage message = collect.get(item.getMessageId()); sendMessageSync(message, item.getContent()); }); }}}Copy the code

One thing that needs to be optimized here is that updating the state of the transaction message record can be optimized for batch updates, which are more efficient when the limit is large. Finally, the configuration class for scheduled tasks:

@Slf4j @RequiredArgsConstructor @Configuration @EnableScheduling public class ScheduleJobAutoConfiguration { private final TransactionalMessageManagementService managementService; */ private final RedissonClient redisson = redisson.create (); @Scheduled(fixedDelay = 10000) public void transactionalMessageCompensationTask() throws Exception { RLock lock = redisson.getLock("transactionalMessageCompensationTask"); Boolean tryLock = lock.tryLock(5, 300, timeunit.seconds); if (tryLock) { try { long start = System.currentTimeMillis(); Log.info (" Start transaction push compensation scheduled task..." ); managementService.processPendingCompensationRecords(); long end = System.currentTimeMillis(); long delta = end - start; If (delta < 5000) {thread.sleep (5000-delta); } log.info(" Execute transaction notification push compensation scheduled task completed, time :{} ms..." , end - start); } finally { lock.unlock(); }}}}Copy the code

After the basic code is written, the structure of the whole project is as follows:

Finally add two test classes:

@RequiredArgsConstructor @Component public class MockBusinessRunner implements CommandLineRunner { private final MockBusinessService mockBusinessService; @Override public void run(String... args) throws Exception { mockBusinessService.saveOrder(); } } @Slf4j @RequiredArgsConstructor @Service public class MockBusinessService { private final JdbcTemplate jdbcTemplate;  private final TransactionalMessageService transactionalMessageService; private final ObjectMapper objectMapper; @Transactional(rollbackFor = Exception.class) public void saveOrder() throws Exception { String orderId = UUID.randomUUID().toString(); BigDecimal amount = BigDecimal.valueOf(100L); Map<String, Object> message = new HashMap<>(); message.put("orderId", orderId); message.put("amount", amount); jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (? ,?) ", p -> { p.setString(1, orderId); p.setBigDecimal(2, amount); }); String content = objectMapper.writeValueAsString(message); transactionalMessageService.sendTransactionalMessage( DefaultDestination.builder() .exchangeName("tm.test.exchange") .queueName("tm.test.queue") .routingKey("tm.test.key") .exchangeType(ExchangeType.DIRECT) .build(), DefaultTxMessage.builder() .businessKey(orderId) .businessModule("SAVE_ORDER") .content(content) .build() ); Log.info (" Save order :{} successful..." , orderId); }}Copy the code

A test result is as follows:

The simulated order data is saved successfully and the RabbitMQ message is sent to the RabbitMQ server normally after the transaction has successfully committed, as shown in the RabbitMQ console data.

summary

The design of transaction message module is only to make the asynchronous message push this function tends to be complete, in fact, a reasonable asynchronous message interaction system, will provide synchronous query interface, which is based on the asynchronous message no callback or no response caused by the characteristics. Generally speaking, the throughput of a system is positively correlated with the proportion of asynchronous processing in the system (see Amdahl’s Law for this point), so asynchronous interaction should be used as much as possible in practical system architecture design to improve system throughput and reduce unnecessary waiting caused by synchronous blocking. The transaction message module can be extended to a back-end management system and can even work with Micrometer, Prometheus, and Grafana systems for real-time data monitoring.

This article demo project repository: Rabbit-transactional Message

Demo can start only when MySQL, Redis, and RabbitMQ are installed on the demo. A local database must be created and named local.