preface

As one of the core components of high concurrency system, message queue can help business system deconstruction to improve development efficiency and system stability. The main advantages are as follows:

  • Peak clipping and valley filling (mainly to solve problems such as message loss and system crash caused by instantaneous write pressure greater than application service capacity)
  • System decoupling (resolves dependencies between systems of different importance levels and different ability levels resulting in death and death)
  • Improved performance (a message can be sent to the messaging system to notify the relevant system when there is a one-to-many call)
  • Storage pressure measurement (some links on the line are not good pressure measurement, can be measured by accumulating a certain amount of information and then release)

I. MQ background & selection

At present, the mainstream MQ is Rocketmq, Kafka, Rabbitmq. Rocketmq has the following advantages over Rabbitmq and Kafka: • Support transactional messaging (message sending and DB operations to maintain final consistency between two parties, not supported by RabbitMQ and Kafka) • Support final consistency of data between multiple systems with RocketMQ (multi-party transactions, • Supports 18 levels of delay (not supported by RabbitMQ and Kafka) • Supports retransmission of failed messages at specified times and intervals (not supported by Kafka and manually confirmed by RabbitMQ) • Supports tag filtering on the consumer end Reduce unnecessary network traffic (not supported by RabbitMQ and Kafka) • Double consumption (not supported by RabbitMQ and Kafka)

Rocketmq, Kafka, and Rabbitmq are compared in the following table:

RocketMQ Cluster Overview

1. RocketMQ cluster deployment structure

1) Name Server

The Name Server is a nearly stateless node that can be clustered without any synchronization of information between nodes.

2) Broker

A Master can correspond to multiple slaves, but a Slave can correspond to only one Master. The mapping between Master and Slave is determined by specifying the same Broker Name. BrokerId 0 for Master and non-0 for Slave. Multiple masters can also be deployed.

Each Broker establishes long connections to all nodes in the Name Server cluster and registers Topic information to all Name Servers periodically (every 30 seconds). The Name Server scans all live Brokers every 10s. If the Name Server does not receive a heartbeat within 2 minutes, the Name Server disconnects from the broker.

3) Producer

The Producer establishes a long connection with one node (randomly selected) in the Name Server cluster, obtains Topic routing information from the Name Server periodically, establishes a long connection with the Master providing Topic services, and sends heartbeat messages to the Master periodically. Producer is stateless and can be deployed in a cluster.

Every 30 seconds (by ClientConfig’s pollNameServerInterval), the Producer gets the latest information about all topic queues from the NameServer. This means that the Producer can sense at most 30 seconds if the Broker becomes unavailable. All messages sent to the Broker during this period will fail.

Producer sends heartbeats to all associated brokers every 30 seconds (as determined by heartbeatBrokerInterval in ClientConfig). The broker scans all live connections every 10 seconds. If the broker does not receive a heartbeat within 2 minutes, Then, the connection to the Producer is closed.

4) Consumer

The Consumer establishes a long-term connection with one of the nodes in the Name Server cluster (randomly selected), obtains Topic routing information from the Name Server periodically, establishes a long-term connection with the Master and Slave that provide Topic services, and periodically sends heartbeat messages to the Master and Slave. Consumers can subscribe to messages from either Master or Slave, and the subscription rules are determined by the Broker configuration.

The Consumer gets the latest queue status for the topic from the Name Server every 30 seconds, meaning that it will take a maximum of 30 seconds for the Consumer to know when the Broker is unavailable.

The Consumer sends heartbeats to all associated brokers every 30 seconds (as determined by heartbeatBrokerInterval in ClientConfig), which scans all live connections every 10 seconds and closes a connection if no heartbeat has been sent within 2 minutes. All consumers in the Consumer Group are notified, and the consumers in the Group reallocate the queue and continue to consume.

When the Consumer is notified that the master is down, it turns to the slave. The slave cannot guarantee that 100% of the master’s messages are synchronized, so a small number of messages are lost. But once the master recovers, unsynchronized messages will eventually be consumed.

How does Rocketmq support distributed transaction messages

scenario

A (DB operation exists) and B (DB operation exists) need to ensure the consistency of distributed transactions. By introducing the middle-layer MQ, A and MQ maintain the consistency of transactions (check is implemented through the MQ backcheck interface in abnormal cases), and B and MQ ensure the consistency of transactions (retry), so as to achieve the final consistency of transactions.

Principle: Large transaction = Small transaction + asynchronous

1. MQ and DB consistency principle (two-party transaction)

The flow chart

The diagram above shows a solution provided by RocketMQ to ensure consistency of MQ messages and DB transactions.

MQ message and DB operation consistency scheme:

1) Send a message to the MQ server with SEND_OK status. This message is not visible to consumer.

2) Perform DB operations. The DB operation is Commit. The DB operation is Rollback.

3) If DB is successfully executed, reply to MQ server with COMMIT_MESSAGE status; If DB fails to execute, reply to the MQ server and change the status to ROLLBACK_MESSAGE. Note that this process may fail.

4)MQ provides an internal service called “Transaction Status Service” that checks the status of transaction messages. If the messages are not committed, the business system will be called back and forward using the TransactionCheckListener registered when Producer started. Business system in checkLocalTransactionState method to check the DB transaction state, if successful, will reply COMMIT_MESSAGE, reply ROLLBACK_MESSAGE otherwise.

Description:

Using DB as an example, this could be any business or data source.

SEND_OK, COMMIT_MESSAGE, and ROLLBACK_MESSAGE are all states provided by the Client JAR and are a number inside the MQ server.

The TransactionCheckListener is called back only in the event of a commit or rollback message loss (gray in the figure above). This kind of message loss only exists when the network is down or the RocketMQ cluster is down. When the RocketMQ cluster hangs, there is a risk of data loss for 1s if an asynchronous flush is used. Ensuring transactions in an asynchronous flush scenario is meaningless. Therefore, if the core business uses Rocketmq to solve the distributed transaction problem, the synchronous flush mode is recommended.

2. Data consistency between multiple systems (multi-party transactions)

The above two-party transaction consistency (resolved through Rocketmq’s transactional messaging) is no longer supported when distributed consistency of multiple parties (more than two parties) is required. In this case, the TCC mode idea (try-confirm-cancel) needs to be introduced.

A solution to maintain ultimate consistency between trading systems and other systems

Take the above trading system as an example:

1) The trading system creates an order (inserts a record into DB) and sends an order creation message. Consistency is guaranteed with RocketMq transactional messages

2) The synchronous core RPC service required to complete the order is then executed (the non-core system processes itself by listening to MQ messages and the result does not affect the transaction status). The execution successfully changed the order status while sending an MQ message.

3) The trading system accepts the order creation message sent by itself and creates a delayed rollback task through the scheduled scheduling system (or uses the retry function of RocketMq to set the second sending time to the delayed creation time of the scheduled task. In the case of non-message blocking, the first arrival delay of the message is about 1ms, and the second consumption time can be specified if the RPC has not completed and the order status has not been set to complete. Deferred task Check whether the order is complete by querying the order status. If the order is complete, the rollback task is not created. Otherwise, the rollback task is created. PS: Multiple RPCS can create a rollback task by receiving a message once through a consumer group; You can also create multiple consumer groups, consume a message multiple times, and create an RPC rollback task for each consumer. The rollback task failed. Retry using the re-send of MQ.

The above is a solution for maintaining ultimate consistency between the trading system and other systems.

3. Case study

1) Transaction schematic diagram in single machine environment

The following is an example of transfer from A to B.

steps

action

1

Lock user A’s account

2

Lock user B’s account

3

Check if account A has 1 dollar

4

1 dollar was deducted from A’s account

5

Add 1 dollar to B’s account

6

Unlock USER B’s account

7

Unlock user A’s account

The above process can even be simplified at the code level to execute two SQL statements in one transaction.

2) Transactions in distributed environment

Unlike stand-alone transactions, accounts A and B may not be in the same DB and cannot be implemented using transactions as in stand-alone transactions. This can be achieved by dividing the transfer operation into two operations.

A) a account

steps

action

1

Lock user A’s account

2

Check if account A has 1 dollar

3

1 dollar was deducted from A’s account

4

Unlock user A’s account

B) MQ Message When A’s account data changes, an MQ message is sent, and the MQ server pushes the message to the transfer system, which adds money to B’s account.

C) account

steps

action

1

Lock user B’s account

2

Add 1 dollar to B’s account

3

Unlock USER B’s account

Sequential messages

1. Sequential message defects

Sequential messages cannot be sent using the cluster Fail Over feature. The parallelism of consuming sequential messages depends on the number of queues. Hot issues in queues.

Principle 2.

When Produce sends messages, it sends messages to the same queue, and consumers register message listener as MessageListenerOrderly, so that only one thread can be guaranteed on the consuming end to consume messages.

Note: Send messages to the same queue, not the same topic. By default, a topic contains four queues

3. The extension

Partial sequential messages can be implemented by implementing a pair column selector method that sends messages.

For example, if a database is synchronized over MQ, you only need to ensure that the data for each table is synchronized. Parsing the binlog takes the table name as an argument to the column selector. This ensures that each table is in the same column and that the table data is consumed in sequence

5. Best practices

1. Producer

1) Topic

An application may have a Topic, and message subtypes are identified by tags, which can be set by the application. Tags can only be used to filter messages at the broker when a consumer subscribes to a message if the sent message is set to tags.

2) key

The unique identifier of each message at the service level must be set to the keys field to help locate message loss problems in the future. The server creates an index (hash index) for each message, and the application can query the content of the message by topic, key, and who consumed the message. Since it is a hash index, it is important to ensure that the key is as unique as possible to avoid potential hash conflicts.

/ / order Id

String orderId= “20034568923546”;

message.setKeys(orderId);

3) log

If a message is sent successfully or fails, send Result and Key fields must be printed to print message logs.

4) send

Send indicates that the message is successfully sent if no exception is thrown. However, there are multiple states for successful sending, defined in sendResult.

SEND_OK: The message was sent successfully

FLUSH_DISK_TIMEOUT: flush_flush_timeout: the message is successfully sent, but the server flush times out. The message is entered into the server queue. In this case, the message is lost only when the server breaks down

FLUSH_SLAVE_TIMEOUT: the message is successfully sent but times out when the server synchronizes to the Slave. In this case, the message is entered into the server queue. In this case, the message is lost only when the server breaks down

SLAVE_NOT_AVAILABLE: Indicates that the message is sent successfully, but the slave is unavailable and the message has entered the server queue. In this case, the message is lost only when the server breaks down

2. Consumer

1) power, etc

RocketMQ uses a message primitive that is At Least Once, so the consumer may receive the same message more than Once and it is important to be idempotent.

2) log

Log consumption for later fault locating.

3) Buy in bulk

Try to use the batch consumption mode, which can greatly improve the consumption throughput.

That’s the end of the article

For messaging-oriented middleware technology, and Kafka, xiaobi here summed up a mind map of Kafka, want to know a little partner can look at it

Like xiaobian to share technical articles can like attention oh!

Spring, Mybatis, JVM, Zookeeper, Spring MVC, Redis distributed lock, etc. Spring Boot, Spring Cloud, Spring Notes).

You can get detailed mind maps and summaries of interview questions in 2020