This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details

preface

Recently, there is a popular word “lie flat”.

Both tycoons and Internet celebrities are frantically discussing the bits and pieces behind the word.

So, let’s also say this word at the beginning of this article, in my thinking.

Let’s start with the concept, according to Wikipedia: frustrated with the oppressive work culture in China, young people are opting for a “lie down” attitude rather than struggling to conform to social expectations.

No matter what time, no matter what kind of work environment, there is this competition. So serious today, we should hold what kind of attitude is life?

I think lying flat is an option, but it’s full of depression.

I wonder, did you achieve your original dream? Perhaps it is to give happiness to family members, perhaps it is to buy what you want to buy, perhaps it is to see the scenery in life. In my opinion, as long as you do not achieve, then, we need to further efforts, nothing else, just for the original dream. People live forever, but a hundred years, if you don’t leave something, isn’t it a pity? We still need to fight for our own future.

Perhaps, a lot of people think, so inside roll, how can there be a future. Yes, everyone thinks this way, but can you change reality? Can’t! Your lying flat only makes you someone else’s tail in the inner reality. Therefore, lying flat is also impossible for you.

But, again whether does not want to kill stimulation inside roll? No, I think, you have to be content, but you have to have small goals. This is the premise of living a wonderful life.

Well, back to today’s technical article!

We still want to progress, still want to learn knowledge, knowledge no matter when, is useful.

Distributed transaction implementation is indispensable in distributed system architecture. Maybe one way or another, but today we’re going to talk about using RocketMQ, the message queue middleware, to implement distributed transactions.

Distributed transaction definition

Distributed transaction means that transaction participants, transaction supporting servers, resource servers, and transaction managers are located on different nodes of different distributed systems. To put it simply, a large operation consists of different small operations, which are distributed on different servers and belong to different applications. Distributed transactions need to ensure that all of these small operations either succeed or fail. Essentially, distributed transactions are designed to ensure data consistency across different databases.

Distributed transaction theory

CAP

CAP theorem, also known as Breuer’s theorem. For architects designing distributed systems (not just distributed transactions), CAP is your gateway theory. The following is an excerpt from Wikipedia to help you understand C A P.

Consistency: Each read operation on the client either reads the latest data or fails to read the data. In other words, consistency is a promise from a distributed system perspective to clients accessing the system: either I return you an error, or I return you absolutely consistent up-to-date data, with an emphasis on data correctness.

② Availability: any client request can get response data, there will be no response error. In other words, availability is another promise to customers who access the system from the perspective of a distributed system: I will return data to you, I will not return errors to you, but I do not guarantee that the data is up to date, emphasis is error free.

③ Partition tolerance: Because distributed systems communicate through networks, networks are unreliable. When any number of messages are lost or arrive late, the system continues to provide service and does not hang up. In other words, partition tolerance is another promise to clients accessing a distributed system that I will always run, no matter what data synchronization problems I have internally, and emphatically not hang up.

BASE

BASE is an acronym for Basically Available, Soft state, and Eventually consistent. AP is an extension to THE AP in CAP

Basic availability: When a distributed system fails, it allows the loss of some available functions to ensure the availability of core functions. Soft state: An intermediate state is allowed in the system that does not affect system availability. This refers to inconsistencies in the CAP. Final consistency: Indicates that data on all nodes will be consistent after a period of time.

BASE solves the problem of no network delay in CAP theory, and adopts soft state and final consistency in BASE to ensure the consistency after delay. BASE is the opposite of ACID in that it is completely different from ACID’s strong consistency model, but instead sacrifices strong consistency for availability and allows data to be inconsistent for a while, but eventually reach a consistent state.

Distributed transaction solutions

At present, there are many solutions for distributed transactions. Specialized open source middleware also exists, such as Seata.

Both RocketMQ transactions and Seata solve distributed transaction problems. The difference is that Seata is CAP theory, while RocketMQ scheme is BASE theory, which is final consistency.

So, let’s walk through RocketMQ’s implementation of distributed transactions.

RocketMQ transaction flow

The execution process is as follows:

Producer is the MQ sender

The Producer (the MQ sender) sends the transaction message to the MQ Server, which marks the message as Prepared. The MQ subscriber cannot consume the message.

Producer sends service-encapsulated messages to the MQ Server.

2. The MQ Server responds to the message sending successfully. If the MQ Server receives the message sent by Producer, the MQ Server responds to the message sending successfully.

The Producer end performs the business code logic and is controlled by the local database transaction.

Producer Adds users.

4. Message delivery If the Producer’s local transaction is successfully executed, the Producer will automatically send a COMMIT message to the MQServer. After receiving the COMMIT message, the MQServer marks the status of “business-encapsulated message” as consumable, and the MQ subscriber (points service) is the normal consumption message.

If the Producer fails to execute the local transaction, the Producer automatically sends a rollback message to the MQServer. After receiving the rollback message, the MQServer will delete the “service-encapsulated message”, and the downstream self-heating cannot consume the message.

The MQ subscriber (downstream service) consumes the message and responds to MQ with an ACK on success, otherwise the message will be received repeatedly. In this case, the ACK automatically responds by default, that is, it automatically responds to the ACK if the program is running properly.

If the Producer fails or times out while executing a local transaction, MQ Server will continuously ask other producers in the same Producer group to obtain the execution status of the transaction. This process is called transaction backcheck. The MQ Server decides whether to deliver the message based on the result of the transaction callback.

RocketMQ is installed and deployed

To use it, you need to install the corresponding message queue service first.

Download and install
  • Download and install RocketMQ 4.8.0
The deployment of

Upload the installation to the /usr/local/src directory

Decompress the package and specify the installation directory

CD /usr/local/src unzip rocketmq-all-4.8.0-bin-release.zip mv rocketmq-all-4.8.0-bin-release.. / rocketmq - 4.8.0Copy the code

Start the NameServer

cd ../rocketmq-4.8.0
nohup sh bin/mqnamesrv &
Copy the code

Check the startup

tail -f ~/logs/rocketmqlogs/namesrv.log
Copy the code

Modify the Broker run configuration

Sh #JAVA_OPT="${JAVA_OPT} -server -xMS8g -xmx8g-xmn4g "#JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"Copy the code

Start the Broker

nohup sh bin/mqbroker -n localhost:9876 &
Copy the code

Check the startup

tail -f ~/logs/rocketmqlogs/broker.log 
Copy the code

Firewall port

firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload
Copy the code

If any error occurs, manually create the mapping file directory

cd  /root/store
mkdir commitlog consumequeue
Copy the code
Test message

Message is sent

export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code

The message received

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code

Quit running

Close the NameServer

sh bin/mqshutdown namesrv
Copy the code

Close the Broker

sh bin/mqshutdown broker
Copy the code
RocketMQ console installation

Download address

Download the source code and complete the packaging

mvn clean package -Dmaven.test.skip=true
Copy the code

Upload the file to the /usr/local/src directory

The script content

nohup java -jar -Dspring.config.location=/app/home/rocketmq-console/application.properties / app/service/rocketmq - the console/rocketmq - the console - ng - 2.0.0. Jar > / app/home/rocketmq - the console/logs/mq_console log > & 1 & 2Copy the code

Opening firewall Ports

firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --reload
Copy the code

Test access

Example Service Scenario

Two service applications are provided as service providers and service consumers of message queues

Bank1 bank deduction service

Bank2 bank addition service

Scenario: Transfer The money from user A to user B from a-100 B+100

Bank1:

1. Provide external apis

2. Initiate a deduction request

3. Send the message to MQ

4. After receiving the message, MQ returns an acknowledgement

5. Bank1 performs the local debit transaction and submits it

MQ:

Mq unlocks the message to allow consumption upon receipt of the bank1 commit confirmation

Bank2:

1. To monitor the MQ

2. Consumer news

3. Perform the local add service

Microservice applications integrate MQ

  • Introduction of depend on
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>The 2.0.3</version>
</dependency>
Copy the code
  • Configuration file Configuration
rocketmq:
  name-server: xxxx:9876
  producer:
    group: base_group_syncMsg
    send-message-timeout: 5000
    retry-times-when-send-failed: 2
    max-message-size: 4194304
Copy the code

Bank1 application implementation

Provide request API

@GetMapping(value = "/rocketmq")
    public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
        // Create the transaction ID as the message content to MQ
        String tx_no = UUID.randomUUID().toString();
        // Encapsulate event entities
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
        // Send a message
        accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
        return "Processing successful - Account: {"+accountNo+"} deduction: {"+amount+"}";
    }
Copy the code

Deductions request

Send a message to MQ

   /** * Send transfer message to MQ *@paramAccountChangeEvent Event entity */
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        // Convert accountChangeEvent to JSON
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        // Generate the message type
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        // Send a transaction message
        /** * String txProducerGroup Production group * String destination topic, * Message
       message, message content * Object ARG parameter */
         rocketMQTemplate.sendMessageInTransaction("producer_group_bank1"."bank",message,null);
    }
Copy the code

Listen for MQ returns

/ * * *@authorLittle hidden Lele *@date 2021/06/3
 * @descriptionConsumer monitoring */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {

    /** * Inject business implementation */
    @Autowired
    AccountInfoService accountInfoService;

    /** * Receive message */
    @Override
    public void onMessage(String message) {
        log.info("Received consumption message :{}",message);
        / / parsing
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        // Convert to an AccountChangeEvent object
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        // Set the account
        accountChangeEvent.setAccountNo("2");
        // Perform a business operation -- increase the amountaccountInfoService.addAccountInfoBalance(accountChangeEvent); }}Copy the code

Implement local business logic

/ * * *@authorLittle hidden Lele *@date 2021/06/3
 * @descriptionAccount service realization */
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    // Update account -- increase amount
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("Bank2 update local account, account: {}, amount: {}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        // Local read transactions prevent repeated consumption
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0) {return ;
        }
        // Insert data -- increases the amount
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        // Add transaction records for idempotent
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        // Reserve the error demo
        if(accountChangeEvent.getAmount() == 250) {throw new RuntimeException("Message processing exception"); }}}Copy the code

Bank1 transaction callback listener

/ * * *@authorLittle hidden Lele *@date 2021/06/3
 * @descriptionProducer transaction callback listener */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {

    @Autowired
    AccountInfoService accountInfoService;

    @Autowired
    AccountInfoDao accountInfoDao;

    /** * The callback method after the transaction message is sent, which is called back when the message is successfully sent to MQ@paramMessage message *@return* /
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            // Parse the message
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            // Convert to the AccountChangeEvent entity
            String accountChangeString = jsonObject.getString("accountChange");
            // Convert accountChange (json) to AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            // Perform a local transaction and deduct the amount
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            / / when to return to RocketMQLocalTransactionState.COM MIT, automatically send a commit to mq message, mq message to consumption
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            // Send ROLLBACK to MQ, which keeps the state of the message unconsumable
            returnRocketMQLocalTransactionState.ROLLBACK; }}/** * check whether the transaction status is deducted *@paramMessage message *@return* /
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // Parse message to AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        // Convert accountChange (json) to AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        / / transaction id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0) {return RocketMQLocalTransactionState.COMMIT;
        }else{
            returnRocketMQLocalTransactionState.UNKNOWN; }}}Copy the code

Bank2 application implementation

Bank2 listening MQ

/ * * *@authorLittle hidden Lele *@date 2021/06/3
 * @descriptionConsumer monitoring */
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {

    /** * Inject business implementation */
    @Autowired
    AccountInfoService accountInfoService;

    /** * Receive message */
    @Override
    public void onMessage(String message) {
        log.info("Received consumption message :{}",message);
        / / parsing
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        // Convert to an AccountChangeEvent object
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        // Set the account
        accountChangeEvent.setAccountNo("2");
        // Perform a business operation -- increase the amountaccountInfoService.addAccountInfoBalance(accountChangeEvent); }}Copy the code

News consumption

log.info("Received consumption message :{}",message);
        / / parsing
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        // Convert to an AccountChangeEvent object
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
Copy the code

Perform local debit transactions

accountInfoService.addAccountInfoBalance(accountChangeEvent);
Copy the code

conclusion

I’m finally done. It was a lot of work to write the demo.

There are many distributed transaction solutions, and it is also for us technical personnel to consider whether we need distributed transaction. If so, I’m sure you can’t miss this article as a quick start on implementing message queue distributed transactions for RocketMQ. I’m ready to write a column if I think it’s good, hahaha.

Lying flat, is not a good choice in the pursuit of dreams, the pace of technology, is always forward, work hard, boys!!