Author: Mi BrotherCopy the code

As one of the core components of high concurrency system, message queue can help business system deconstruction to improve development efficiency and system stability.

Official website and Documentation

Github:github.com/apache/rock… Docs:github.com/apache/rock…

Books recommended

Inside RocketMQ technology

An overview of the

As one of the core components of high concurrency system, message queue can help business system deconstruction to improve development efficiency and system stability. The main advantages are as follows:

  • Peak clipping and valley filling: It mainly solves problems such as message loss and system crash caused by instantaneous write pressure greater than application service capacity

  • System decoupling: Resolves dependencies between systems of different importance levels and different ability levels that lead to death and death

  • Improved performance: When one to many calls exist, a message can be sent to the messaging system to notify the relevant system

  • Current storage pressure measurement: some links on the line are not good pressure measurement, can be by accumulating a certain amount of information and then release to pressure measurement

RocketMQ is a distributed, queue-model messaging middleware with the following features:

  • Supports strict message ordering

  • Both Topic and Queue modes are supported

  • Hundred million message accumulation capability

  • Friendly distributed features

  • Both Push and Pull consuming messages are supported

  • After many times tmall Double eleven mass news test

At present, the mainstream MQ is RocketMQ, Kafka, RabbitMQ, its main advantages are:

  • Support for transactional messages (message sending and DB operations maintain final consistency, not supported by RabbitMQ and Kafka)

  • Support final consistency of data between multiple systems incorporating RocketMQ (multi-party transaction, two-party transaction prerequisite)

  • 18 levels of latency (not supported by RabbitMQ and Kafka)

  • Supports resending failure messages at specified times and intervals (Kafka does not support this, RabbitMQ requires manual confirmation)

  • Support Tag filtering on Consumer to reduce unnecessary network traffic (not supported by RabbitMQ and Kafka)

  • Repeated consumption (not supported by RabbitMQ, supported by Kafka)

Message queue comparison reference table

The technical architecture

Role:

  • -Serena: Well, I’m not being a Producer.

  • -Penny: You’re a Consumer.

  • Broker: Stores, delivers, and queries messages

  • NameServer: Indicates the route registration center. The functions include Broker management and route information management

Relationships between Topic, Broker, and Queue

Core data structure

Heartbeat Timeout Mechanism

Questions and conclusions

1. The process of sending, storing and consuming messages?

  • Message sending process:

  • When the Broker starts, it calls Producer to send a message to the NameServer registration client, and first retrieves routing information for the topic from NameServer. The header code is the routing information returned by GET_ROUTEINFO_BY_TOPIC from NameServer, including the list of queues and brokers contained in the topic. The Producer end selects a queue based on the query strategy and uses it to store subsequent messages. Each message generates a unique ID. To the properties of the message. The key of the property is UNIQ_KEY, which performs special processing on the message. For example, if the message exceeds 4M, the message is compressed. Producer sends an RPC request to the Broker to save the message to the Broker.

  • Message storage process:

  • After receiving a message, the Broker saves the original message information in the MappedFile of the CommitLog file and asynchronously flusher it to disk

  • The ReputMessageServie thread asynchronously saves messages from the CommitLog MappedFile to the ConsumerQueue and IndexFile

  • ConsumerQueue and IndexFile are simply the index information of the original file

  • Message consumption process

  • Message queue load Consumers in the same consumer group share the consumption of message queues under their subscribed topics. Allocation algorithm: Try to use: average allocation, average polling allocation. Allocation principle: A message consumption queue can only be allocated to the same consumer at the same time. A consumer can allocate multiple message consumption queues.

  • Message pull

  • The consumption of sequential messages has a message queue lock:

  • 1. Send a request to the broker to lock the message queue.

  • 2. Lock the pull task that successfully creates the message queue.

  • 3. If the lock fails, wait for other consumers to release the lock on the message queue.

  • Consumption pattern:

  • 1. Cluster mode: Polling or algorithmically distributed, messages will only be consumed by a certain consumer and can be retried.

  • 2. Broadcast mode: Each message is consumed by all consumers subscribed to the message topic. There is no retry.

  • Message acknowledgement is based on THE ACK acknowledgement mechanism

  • Consumption progress management, recording the consumption progress of messages

  • Broadcast mode: Stored locally in the consumer.

  • Clustered mode: Stored in the server broker.

  • Detailed reference: cloud.tencent.com/developer/a…

2. How to ensure data loss from asynchronous disk flushing?

RocketMQ messages are stored on disk, both to ensure recovery from power outages and to store more messages than can be stored in memory. In order to improve performance, RocketMQ tries to keep disk writes as sequential as possible. When messages are written to RocketMQ using Producer, there are two ways to write to disk:

  • 1) Asynchronous flush mode: When the write success status is returned, the message may only be written into the memory PAGECACHE, so the write operation returns quickly and has a large throughput; When the number of messages in the memory accumulates to a certain extent, disk write operations are triggered. Advantages: High performance Disadvantages: When the Master machine breaks down and the disk is damaged, a small number of messages are lost, causing the status of MQ messages to be inconsistent with that of producer/consumer messages

  • 2) Synchronous flush mode: The message has been written to the disk before the application write success status is returned. The specific process is that, after the message is written to the PAGECACHE of the memory, the flush thread is immediately notified to flush the disk, and then waits for the flush completion. After the flush thread completes its execution, it wakes up the waiting thread and returns to the application the status of writing the message successfully. Advantages: Can keep MQ message state consistent with producer/consumer message state Disadvantages: Lower performance than asynchronous

  • Whether flush is synchronous or asynchronous is set using the flushDiskType parameter in the Broker configuration file, which is set to SYNC_FLUSH and ASYNC_FLUSH.

  • Synchronous or asynchronous replication

  • If a broker group has Master and Slave, messages need to be copied from the Master to the Slave, either synchronously or asynchronously.

  • 1) Synchronous replication: Write success status is reported to the client only after the Master and Slave write success. Advantages: If the Master fails, all backup data is available on the Slave, which is easy to recover. Consumers can still consume data from the Slave, and messages are not lost. Increases the data write delay and reduces the system throughput. The performance is about 10% lower than that in asynchronous replication mode, and the response time of sending a single Master is slightly higher

  • 2) Asynchronous replication: The write success status is reported to the client as long as the Master writes successfully. Advantages: The system has low latency and high throughput. After the Master is down, consumers can still consume from the Slave. This process is transparent to applications and does not require manual intervention. The performance is almost the same as that of multiple Master modes.

  • Synchronous and asynchronous replication is set through the brokerRole parameter in the Broker configuration file, which can be set to one of the ASYNC_MASTER, SYNC_MASTER, or SLAVE values.

  • Conclusion:

  • Zero message loss is a double-edged sword. To use it well, it depends on the specific service scenario. Balancing the performance and zero message loss in actual applications, it is recommended to set the Master and Slave to ASYNC_FLUSH and the Master and Slave to SYNC_MASTER. In this way, even if one machine fails, data can still be guaranteed.

3, zero copy implementation principle?

Zero-copy principle: Consumer The process of consuming a message uses zero-copy. Zero-copy can be used in the following two ways:

  • 1. Use mmap + Write (RocketMQ: it works better than SendFile because of the need to transfer small pieces of data)

  • Advantages: Even if frequently called, the use of small block file transfer, high efficiency;

  • Disadvantages: Can not make good use of DMA mode, will consume more CPU than SendFile, memory security control complex, need to avoid JVM Crash problem.

  • 2. Use SendFile

  • Advantages: Can use DMA mode, less CPU consumption, high efficiency of large file transfer, no new problems of memory security;

  • Disadvantages: Low efficiency of small block files. The file can be transferred only in BIO mode but not NIO.

4. How do new apps start consuming sites?

When a new consumer group subscribes to an existing Topic, from which message does the consumer group start consuming for that Topic?

When I first look at DefaultMQPushConsumer’s API, I see the setConsumeFromWhere API, which literally sets the consumer where to start spending.

  • CONSUME_FROM_MAX_OFFSET Starts consumption from the maximum offset of the consumption queue.

  • CONSUME_FROM_FIRST_OFFSET The consumption starts from the minimum offset of the consumption queue.

  • CONSUME_FROM_TIMESTAMP Starts consumption at the specified timestamp, which defaults to 30 minutes before the consumer starts. DefaultMQPushConsumer#setConsumeTimestamp.

  • For a new consumer group, neither cluster mode nor broadcast mode stores the progress of the consumer group, which can be read as -1. In this case, the DefaultMQPushConsumer#consumeFromWhere attribute is used to determine where to start the consumption. First we need to find the corresponding processing entry. As we know, when a message consumer pulls a message from the Broker server, it needs to load the consumption queue, known as the RebalanceImpl. Conclusion: If a new consumer group subscris to a topic that has existed for a long time in a production environment, setting CONSUME_FROM_MAX_OFFSET is expected

Details refer to: www.jianshu.com/p/d8b73e3c6…

5. The implementation principle of sequential message?

Queue-based (partitioned) sequential consumption is provided in RocketMQ. Sequentiality in RocketMQ mainly refers to the sequential consumption of messages. In RocketMQ, each consumer group has a separate thread pool for concurrent consumption of pulled messages, i.e., the consumer side is multithreaded consumption. The concurrency of sequential consumption is equal to the number of queues to which the consumer is assigned.

  • The completion sequence of RokcetMQ is mainly implemented by the 3 keys.

  • 1. When the consumer terminal is started, it will first carry out queue load mechanism, following the principle that one consumer can allocate multiple queues, but one queue will only be consumed by one consumer.

  • 2. According to the allocated queue, the consumer applies to the Broker for a lock. If the lock is applied, the Broker pulls the message.

  • 3. After the message is pulled, it will be consumed in the thread pool of the consuming end. However, during consumption, the consumption queue will be locked, that is, multiple messages in the same consumption queue will be executed serially.

  • 4. In the process of consumption, ** will lock the ProccessQueue to ensure that the message consumption in the process is completed, and other consumers can continue to consume after the queue load occurs.

  • The first two keys are easier to understand. what is the use of the last one?

  • For example queue q3 C2 is currently assigned to consumers to spend, have to pull the 32 message processing in the thread pool, then has carried on the expansion to consumers, assigned to C2 q3 queue, assigned to C3, because C2 will handle part, site information has not been submitted, if the C3 q3 queue the message immediately to consumption, It is part of the data will be repeated consumption, so at the time of C2 consumer consumption in q3 queue, message not consumption is complete, the load queue cannot discard the queue, and will not release on the broker end of zobah, and other consumer consumption can’t from the queue, as much as possible in order to ensure the repeated consumption of news, ensure sequential semantics.

6. When new consumers join the consumer group or some consumers break down, how to deal with it?

  • By default, the RebalanceService thread loads the message queue every 20 seconds and allocates the queue according to a load algorithm based on the number of consumers in the current consumption group and the number of topic queues.

The follow-up plan

  • 🔲offset Storage principle

  • 🔲 Data structure

  • 🔲 Transaction messages, message back lookup mechanism

  • 🔲 Delayed message

  • 🔲broker, producer, and consumer increase or decrease their impact

  • 🔲push and pull logic