basis

1. Why use message queues?

Message queues have three main uses. Let’s take an e-commerce system for example:

  • Decoupling: Before introducing message queue, after placing order, order service needs to call inventory service to reduce inventory, call marketing service and marketing data…… Once the message queue is introduced, the order completion message can be thrown into the queue and the downstream service can invoke it itself, thus completing the decoupling of the order service from other services.

  • Asynchronous: After the order is paid, we subtract inventory, add credits, send messages, etc., so that the link is long, and the response time is long as the link is long. By introducing message queues, you can do everything but update order status asynchronously, which reduces response time.

  • Peak clipping: the message queue is used for peak clipping, for example, the second kill system. Normally, the traffic is very low, but we need to do the second kill activity, when the traffic is crazy, our server, Redis, MySQL have different capacity, directly all the traffic according to the order must be a problem, serious points may be directly suspended.

    We can throw requests in the queue and only release as much traffic as our service can handle, which can withstand a short period of heavy traffic.

    Decoupling, asynchronism and peak clipping are the three main functions of message queue.

2. Why RocketMQ?

The comparison of several major message queues in the market is as follows:

To sum up:

The choice of middleware can be based on these dimensions: reliability, performance, functionality, operationability, scalability, community activity. Currently, several commonly used middleware, ActiveMQ, as an “antique”, are not widely used in the market. Others are as follows:

  • The RabbitMQ:

  • Advantages: Lightweight, fast, easy to deploy and use, and flexible route configuration

  • Disadvantages: Performance and throughput is not ideal, not easy to secondary development

  • RocketMQ:

    • Advantages: Good performance, high throughput, stable and reliable, with an active Chinese community

    • Disadvantages: Compatibility is not very good

  • Kafka:

    • Advantages: Strong performance and throughput, good compatibility
    • Disadvantages: High latency due to “save a wave of reprocessing”

Our system is a user-oriented C-terminal system with a certain amount of concurrency and high requirements on performance, so we choose RocketMQ with low latency, high throughput and good availability.

3. What are the pros and cons of RocketMQ?

RocketMQ advantages:

  • Single-machine throughput: 100,000 levels
  • Availability: Very high, distributed architecture
  • Message reliability: Zero message loss can be achieved after optimized parameter configuration
  • Function support: MQ function is more perfect, or distributed, good scalability
  • 1 billion level message heap is supported without performance degradation due to heap
  • The source code is Java, convenient combined with the company’s own business secondary development
  • Born for the financial Internet field, for the high reliability requirements of the scene, especially in the e-commerce order deduction, and business peak cutting, in a large number of transactions influx, the back end may be unable to timely process the situation
  • RoketMQ may be more reliable in terms of stability. These business scenarios have been tested many times in Alibaba Double 11. If your business has the above concurrent scenarios, it is recommended to choose RocketMQ

RocketMQ faults:

  • The supported client language is not much, currently Java and c++, c++ is not mature
  • Interfaces such as JMS are not implemented in the MQ core, and some systems require significant code changes to migrate

4. What message models do message queues have?

Message queues have two models: a queue model and a publish/subscribe model.

  • Queuing models

    This is the original message queue model, corresponding to the message queue “send – save – receive” model. Producers send messages to a queue. A queue can store messages from multiple producers, and a queue can have multiple consumers, but consumers are in a competitive relationship, that is, each message can only be consumed by one consumer.

  • Publish/subscribe model

    If one piece of message data needs to be distributed to multiple consumers, and each consumer wants to receive the full message. Clearly, the queue model does not meet this requirement. The solution is the publish/subscribe model.

    In the publish-subscribe model, the sender of the message is called Publisher, the receiver of the message is called Subscriber, and the container where the message is stored on the server is called Topic. Publishers send messages to topics, and subscribers need to “subscribe to topics” before receiving messages. “Subscribe” here is both an action and a logical copy of the topic at consumption, with each subscription allowing the subscriber to receive all messages for the topic.

    The similarities and differences between this and the “queue pattern” are the same: producers are publishers, queues are topics, and consumers are subscribers. The only difference is whether a single piece of message data can be consumed multiple times.

5. What about RocketMQ’s message model?

The message model used by RocketMQ is the standard publish-subscribe model, and in RocketMQ’s glossary, producers, consumers, and topics are exactly the same concepts as in the publish-subscribe model.

RocketMQ’s own message is made up of the following parts:

  • Message

“Message” is the Message to be transmitted.

A message must have a Topic, which you can think of as the address to which your letter will be mailed.

A message can also have an optional Tag and key-value pair at the value, which can be used to set a business Key and look up the message on the Broker to find problems during development.

  • Topic

A Topic can be thought of as a categorization of messages. It is the first level of type of messages. For example, an e-commerce system can be divided into transaction messages, logistics messages, etc., and a message must have a Topic.

Topics have very loose relationships with producers and consumers. A Topic can have zero, one, or more producers sending messages to it, and a producer can send messages to different topics at the same time.

A Topic can also be subscribed by zero, one, or more consumers.

  • Tag

A Tag, which can be thought of as a subtopic, is the second level type of message to provide additional flexibility for the user. Using tags, messages for different purposes within the same business module can be identified with different tags for the same Topic. For example, the transaction message can be divided into: transaction creation message, transaction completion message, and so on. A message can have no Tag.

Tags help keep your code clean and consistent, and they also help with the query system RocketMQ provides.

  • Group

In RocketMQ, the concept of a subscriber is represented by a Consumer Group. Each Consumer group has a complete message in the consumption topic, and consumption progress among different Consumer groups is not affected by each other. That is to say, a message once consumed by Consumer Group1 will also be consumed by Consumer Group2.

A consumer group contains multiple consumers, and the consumers in the same group are competing consumers. Each consumer is responsible for part of the messages in the consumer group. By default, if a message is consumed by consumer Consumer1, no other consumers in the same group will receive the message.

  • Message Queue

Message queues. A Topic can have multiple Message queues. A Topic includes multiple Message queues.

RocketMQ also has several other queues — ConsumerQueue, for example.

  • Offset

During Topic consumption, since messages need to be consumed multiple times by different groups, they are not immediately deleted. This requires RocketMQ to maintain a Consumer Offset on each queue for each Consumer group, where all previous messages have been consumed. None of the subsequent messages are consumed, and for each successful message consumed, the consumption position is increased by one.

In other words, Queue is an array of infinite length, Offset is the subscript.

These are the key concepts in RocketMQ’s message model. Draw a picture to sum it up:

6. Do you understand the consumption patterns of messages?

There are two patterns of news consumption: Clustering (Clustering) and Broadcasting (broadcast).

The default mode is cluster consumption. In this mode, a consumer group jointly consumes multiple queues of a topic. A queue will be consumed by only one consumer.

The broadcast consumption message will be sent to each consumer in the consumer group for consumption.

7. Do you know the basic architecture of RoctetMQ?

The basic architecture of RocketMQ:

RocketMQ consists of four components: NameServer, Broker, Producer, Producer, and Consumer. Each component is clustered to ensure high availability.

8. Can you introduce these four parts?

Think of the postal system in which we live

The normal operation of the postal system depends on the following four roles: one is the sender, the other is the receiver, the third is the post office responsible for temporary storage and transmission, and the fourth is responsible for coordinating the management of various local post offices. In RocketMQ, the four roles are Producer, Consumer, Broker, and NameServer.

NameServer

NameServer is a stateless server with a role similar to, but lighter than, Zookeeper used by Kafka. Features:

  • Each NameServer node is independent of each other and does not interact with each other.
  • Nameserver is designed to be almost stateless and identifies itself as a pseudo-cluster by deploying multiple nodes. The Producer gets routing information about topics from Nameserver before sending messages, that is, which Broker to send to. Consumers also periodically obtain routing information for topics from NameServer. Brokers register with NameServer at startup, periodically make heartbeat connections, and periodically synchronize maintained topics to NameServer.

There are two main functions:

  • 1. Maintain long connections to Broker nodes.
  • 2. Maintain routing information of Topic.

Broker

Message storage and forwarding role, responsible for storing and forwarding messages.

  • The Broker maintains internal Consumer queues that store indexes for messages, with commitlogs being the real place to store messages.

  • A single Broker maintains a long connection and heartbeat with all Nameservers and synchronizes Topic information to Nameserver periodically. The underlying communication between Nameserver and Nameserver is implemented via Netty.

Producer

Message producer, the business side is responsible for sending messages, by the user’s own implementation and distributed deployment.

  • Producer is deployed by users in a distributed manner, and messages are sent by Producer to Broker clusters using multiple load-balancing modes, with low latency and fast failure.
  • RocketMQThere are three ways to send messages: synchronous, asynchronous, and one-way
    • Synchronous sending: Synchronous sending means that the sender sends data packets only after receiving the response from the receiver. It is used for important notification messages, such as important notification emails and marketing SMS messages.
    • Asynchronous sending: After sending data, the sender does not wait for the receiver to send back the response and then sends the next data packet. This method is generally used in service scenarios where the link takes a long time and the response time is sensitive, for example, the transcoding service is enabled after a user uploads a video.
    • Unidirectional sending: Unidirectional sending only sends messages without waiting for a response from the server and without triggering a callback function. It is applicable to scenarios that require very short time but do not require high reliability, such as log collection.

Consumer

Message consumers are responsible for consuming messages, and typically the backend system is responsible for asynchronous consumption.

  • Consumer is also deployed by users, supports PUSH and PULL consumption modes, supports cluster consumption and broadcast consumption, and provides real-time message subscription mechanisms.
  • Pull: Pull consumers actively Pull information from the message server. As long as they Pull messages in batches, the user application will start the consumption process. Therefore, Pull is called active consumption.
  • Push: The Push Consumer encapsulates the pull, consumption schedule, and other internal maintenance work of the message, leaving the callback interface performed when the message arrives to the user application. So Push is called the passive consumption type, but actually from the implementation of Pull messages from the message server, different from Pull is that Push first registers the consumption listener, when triggered at the listener to start consuming messages.

The advanced

9. How to ensure the availability/reliability/non-loss of messages?

At what stages are messages likely to be lost? There are three phases in which loss can occur: production, storage, and consumption.

So consider these three stages:

production

In the production stage, the request confirmation mechanism is mainly used to ensure the reliable delivery of messages.

  • 1. When sending synchronously, pay attention to handling response results and exceptions. If the response OK is returned, the message was successfully sent to the Broker. If the response fails, or some other exception occurs, the message should be retried.
  • 2. When sending an asynchronous message, check it in the callback method. If the message fails or is abnormal, retry it.
  • 3. If a timeout occurs, you can also check whether the Broker has been successfully stored by querying the log API.

storage

In the storage phase, you can configure reliability-first Broker parameters to avoid message loss due to downtime. In short, the reliability-first scenario should use synchronization.

  • If the Broker goes down, unconsumed messages can be restored and consumed again as long as they are persisted to commitlogs.
  • Both synchronous flushing and asynchronous flushing ensure that the message is stored in the Pagecache. However, synchronous flushing is more reliable. Producer sends a message, waits for the data to persist to disk, and then returns a response to Producer.

  • The Broker supports synchronous replication between Master and Slave, and asynchronous replication between Master and Slave. Messages from producers are sent to the Master, but consumption can be consumed from either Master or Slave. Synchronous replication ensures that even if the Master is down, messages will be backed up in the Slave, ensuring that messages will not be lost.

consumption

From a Consumer perspective, how can messages be successfully consumed?

  • The key for a Consumer to ensure successful consumption of a message is the timing of the confirmation, which should not be sent immediately after the message is received, but after all of the Consumer business logic has been executed. Because the message queue maintains the location of the consumption, the logic fails, there is no acknowledgement, and then the queue pulls the message, which is the same as before.

10. How to deal with message duplication?

For distributed message queues, it is difficult to simultaneously ensure certain delivery and non-repeat delivery, known as “once and only”. RocketMQ chose to ensure a certain delivery, ensuring that messages are not lost, but potentially causing message duplication.

There are two main ways to deal with message duplication: business idempotent and message deduplication.

Business idempotent: The first is to ensure that the consumption logic is idempotent, meaning that multiple calls have the same effect as one call. This way, no matter how many times the message is consumed, there is no impact on the business.

Message deduplication: The second is the business side, which no longer consumes duplicate messages. In this approach, you need to ensure that each message has a unique number, usually business related, such as an order number, a record of consumption needs to be stored, and you need to ensure atomicity in the step of message confirmation.

Specific approach is to set up a consumption record table, get this message to do database insert operation. Give the message a primary key or a unique constraint, and if repeated consumption occurs, it will cause a primary key conflict and the message will not be processed.

11. How to deal with message backlog?

There is a backlog of information, at this time we have to find a way to quickly consume the backlog of information, we have to consider improving the consumption capacity, there are generally two ways:

  • Consumer expansion: If the number of Message Queue of the current Topic is greater than the number of consumers, we can expand the number of consumers and increase the number of consumers to improve the consumption capacity and play the backlog of Message consumption as soon as possible.
  • Message migration Queue expansion: If the number of Message queues in the current Topic is less than or equal to the number of consumers, in which case it is useless to expand the number of consumers, we should consider expanding the Message Queue. You can create a temporary Topic, set up a number of Message queues for the temporary Topic, and then use some consumers to dump the consumption data to the temporary Topic, because there is no business processing, just forward the Message, still very fast. Then, the expanded consumers will be used to consume the data in the new Topic. After consumption, the data will be restored to the original state.

12. How are sequential messages implemented?

A sequential message is one in which the message is consumed in the same order as it is produced. In some business logic, the order must be guaranteed, such as order generation, payment, and shipment. The message must be processed in order.

The sequential message is divided into global sequential message and partial sequential message. The global sequential message means that all messages under a certain Topic must be in order.

Partial sequential messages only need to ensure that each group of messages is consumed sequentially, such as order messages, as long as the same order ID messages can be consumed sequentially.

Partial order message

Partial sequential messages are relatively easy to implement. The production end needs to send messages with the same ID to the same Message Queue. During consumption, sequential processing of messages read from the same Message Queue is required – the consumer cannot process sequential messages concurrently to achieve partial order.

The sender uses the MessageQueueSelector class to control which MessageQueue to send messages to.

The consumer solves the problem of single Message Queue messages being processed concurrently by using MessageListenerOrderly.

Global order message

RocketMQ does not guarantee order by default. For example, when creating a Topic, there are eight write queues and eight read queues by default. In this case, a message can be written to either queue. During data reading, there may be multiple consumers, and each Consumer may start multiple threads for parallel processing, so it is uncertain which Consumer the message is consumed by, and whether the order in which it is consumed is the same as the order in which it is written.

To ensure global sequential messages, set the number of read and write queues for a Topic to one, and then set the number of concurrent queues for Producer consumers to one. In short, in order to keep the overall Topic global messages in order, all concurrent processing is eliminated and the parts are single-threaded, completely sacrificing the high-concurrency, high-throughput characteristics of RocketMQ.

13. How to implement message filtering?

There are two options:

  • One method is to filter the Broker according to the Consumer’s deduplication logic. The advantage of this method is to avoid useless messages being transmitted to the Consumer, but the disadvantage is to increase the burden of the Broker, which is relatively complex to implement.
  • The other method is to filter on the Consumer side, such as removing the weight of the message according to the set tag. This advantage is simple to implement, but the disadvantage is that a large number of useless messages can only be discarded on the Consumer side.

Cosumer-side filtering is generally used, and Broker filtering can be used if you want to improve throughput.

Messages can be filtered in three ways:

  • According to Tag filtering: this is the most common one and is efficient and simple to use

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
    consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
    Copy the code
  • SQL expression filtering: SQL expression filtering is more flexible

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    // Only subscribed messages have this attribute a, a >=0 and a <= 3
    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code
  • Filter Server mode: The most flexible and complex mode, allowing users to customize functions for filtering

14. Do you understand the delayed message?

The automatic cancellation of e-commerce orders due to timeout is a typical example of using delayed messages. After a user submits an order, he can send a delayed message and check the status of the order one hour later. If the payment is still not made, he can cancel the order and release the inventory.

RocketMQ supports delayed messages. You only need to set the message delay level when producing the message:

      // Instantiate a producer to generate a delayed message
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // Start the producer
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // Set the delay level to 3, the message will be sent after 10 seconds (now only supports a fixed number of times, see delayTimeLevel)
          message.setDelayTimeLevel(3);
          // Send a message
          producer.send(message);
      }
Copy the code

However, RocketMQ currently supports a limited level of latency:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

How does RocketMQ implement delayed messages?

Simple, eight words: temporary storage + scheduled task.

The Broker receives delayed messages and sends them to the Message Queue for the corresponding time period of the Topic (SCHEDULE_TOPIC_XXXX). The Broker then polls these queues with a scheduled task. When these queues expire, the Message is delivered to the target Topic Queue, and the consumers can consume the messages.

15. How is distributed message transaction implemented? Half a message?

A message: A message sent by the Producer to the Broker is marked as “undeliverable” and cannot be consumed by a Consumer for a while. The message must be confirmed by the Producer after performing a local transaction. Only Consumer can consume this message.

Depending on semi-message, distributed message transaction can be realized, where the key lies in the second confirmation and message back lookup:

  • 1. The Producer sends half messages to the broker
  • 2. The Producer end receives the response and sends the message successfully. At this time, the message is a semi-message marked as “undeliverable” and the Consumer cannot consume it.
  • 3. The Producer end performs local transactions.
  • When the local transaction is complete, Producer sends a Commit/Rollback to the Broker. If the transaction is Commit, the Broker marks the half-message as normal and the Consumer can consume it. If the transaction is Rollback, Producer sends a Rollback to the Broker. The Broker discarded the message.
  • 5. Abnormal situation: The Broker can not wait for the second confirmation. After a certain period of time, all the half-messages will be queried, and then the execution of the half-messages will be queried on the Producer end.
  • 6. The Producer end queries the status of local transactions
  • Commit COMMIT /rollback to the broker based on the transaction status. (5, 6, and 7 are message backchecks)
  • After the message is consumed by the consumer segment, the local transaction is executed.

16. Do you know the dead-letter queue?

Dead-letter queues are used to process messages that cannot be consumed normally, known as dead-letter messages.

When a message fails to be consumed initially, message queue RocketMQ automatically retries the message. If consumption still fails after the maximum number of retries is reached, it indicates that the consumer cannot properly consume the message under normal circumstances. In this case, message queue RocketMQ does not immediately discard the message, but sends it to the special queue corresponding to the consumer, which is called the dead letter queue.

Characteristics of dead-letter messages:

  • Will not be normal consumption by consumers.
  • The validity period is the same as that of a normal message, which is 3 days. After 3 days, the message is automatically deleted. Therefore, a dead letter message needs to be processed within 3 days after it is generated.

Dead letter queue features:

  • A dead letter queue corresponds to a Group ID, not to a single consumer instance.
  • If a Group ID does not generate a dead-letter message, message queue RocketMQ does not create a dead-letter queue for it.
  • A dead-letter queue contains all dead-letter messages generated for the corresponding Group ID, regardless of which Topic the message belongs to.

The RocketMQ console provides the ability to query, export, and resend dead-letter messages.

17. How can RocketMQ be highly available?

Because NameServer is stateless and does not communicate with each other, it is highly available as long as cluster deployment is required.

RocketMQ’s high availability is mainly reflected in the high availability of the read and write of the Broker, which is achieved through clustering and master-slave.

A Master Broker supports read and write, while a Slave Broker supports only read. The Master synchronizes messages to the Slave.

That is, Producer can only write messages to Master brokers, while Cosumer can read messages from Master and Slave brokers.

In the Consumer configuration file, there is no need to set whether to read from the Master or Slave. When the Master is unavailable or busy, the Consumer’s read requests are automatically switched to the Slave. With the automatic Consumer switch mechanism, when a Master machine fails, the Consumer can still read messages from the Slave, which makes reading highly available.

How to achieve high availability of write on the sender side? When a Topic is created, Message queues for the Topic are created on multiple Broker groups (with the same Broker name and different brokerId machines) so that when the Master of the Broker group becomes unavailable, RocketMQ currently does not support automatic conversion of the Slave to Master. If the machine resources are insufficient, you need to convert the Slave to Master. Manually stop the slave-colored Broker, change the configuration file, and start the Broker with the new configuration file.

The principle of

18. Describe the overall workflow of RocketMQ?

Simply put, RocketMQ is a distributed message queue, meaning message queue + distributed system.

As a message queue, it is a model of sending, depositing and receiving, corresponding to Producer, Broker and Cosumer. As a distributed system, it has servers, clients, registries, brokers, Producers/Consumers, and NameServer

RocketMQ consists of a NameServer registry cluster, a Producer Producer cluster, a Consumer Consumer cluster, and several brokers:

  1. The Broker registers all nameservers at startup, maintains long connections, and sends heartbeat every 30 seconds
  2. When sending a message, the Producer obtains the address of the Broker server from NameServer and selects a server to send the message based on the load balancing algorithm
  3. Conusmer also gets the Broker address from NameServer when consuming the message, and then actively pulls the message to consume

19. Why doesn’t RocketMQ use Zookeeper as its registry?

Kafka we all know that Zookeeper is used as a registry — and of course Zookeeper is gradually being used. RocketMQ doesn’t use Zookeeper, but there are a few reasons why:

  1. Considering availability, according to CAP theory, only two points can be satisfied at the same time, while Zookeeper satisfies CP, that is to say, Zookeeper cannot guarantee service availability. Zookeeper elections take too long, during which the whole cluster is unusable. This is certainly unacceptable for a registry, which as a service discovery should be designed for usability.
  2. Based on performance considerations, The implementation of NameServer itself is very lightweight, and can be horizontally expanded by adding machines to increase the stress resistance of the cluster. However, the write of Zookeeper is not extensible. Zookeeper can only solve this problem by dividing the domain into multiple Zookeeper clusters. First of all, it is too complicated to operate, and second, it still violates the design of A in CAP, which leads to the disconnection between services.
  3. ZooKeeper ZAB maintains a transaction log on each ZooKeeper node for each write request. At the same time, it periodically mirrors the memory data to disks to ensure data consistency and persistence. For a simple service discovery scenario, this is not really necessary; the implementation is too heavy. And the data stored itself should be highly customized.
  4. Message delivery should be weakly dependent on the registry, and RocketMQ is designed based on this idea. Producers get Broker addresses from NameServer and cache them locally when they first send a message. If NameServer is not available in the entire cluster, It doesn’t have much effect on producers and consumers in the short term.

20. How does the Broker store data?

RocketMQ mainly stores CommitLog files, ConsumeQueue files, and Indexfile files.

Overall message store design:

  • CommitLog: the message body and metadata storage body, which stores the message body content written by the Producer. The message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. Messages are mainly written sequentially to the log file, and when the file is full, to the next file.

    CommitLog files are stored in ${Rocket_Home}/store/ CommitLog. From the picture, we can clearly see the offset of file names. By default, each file is 1G.

  • ConsumeQueue: message consumption queue, introduced primarily to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is subject specific, and it is inefficient to retrieve messages by topic through commitlog files.

    The Consumer can then look for messages to consume based on the ConsumeQueue. ConsumeQueue serves as the index of the consuming messages, storing the CommitLog’s starting physical offset, message size, and HashCode of the message Tag.

    ConsumeQueue files can be regarded as CommitLog files based on topics. Therefore, the organization mode of the ConsumeQueue folder is as follows: Topic /queue/file Three-layer organization structure, the specific storage path is: $HOME/store/consumequeue / {topic} / {queueId} / {fileName}. Similarly, the ConsumeQueue file adopts a fixed length design, with each entry having a total of 20 bytes, including 8-byte CommitLog physical offset, 4-byte message length, and 8-byte tag hashcode. A single file consists of 30W entries, and each entry can be randomly accessed like an array. Each ConsumeQueue file is about 5.72M in size;

  • IndexFile: IndexFile provides a way to query messages by key or time interval. The Index file is stored in: HOME\store\indexHOME \ Store \indexHOME\ Store \index{fileName} fileName is named after the timestamp when it was created. The size of a single IndexFile is fixed at about 400M. An IndexFile can hold 2000W indexes. The underlying storage of IndexFile is designed to implement HashMap structure in the file system, so the underlying implementation of RocketMQ IndexFile is hash index.

To summarize: RocketMQ uses a hybrid storage structure, with all queues in a single instance of the Broker sharing a single log file (CommitLog).

RocketMQ’s mixed storage structure (message entities for multiple topics are stored in a CommitLog) uses separate data and index parts for the Producer and Consumer respectively. The Producer sends messages to the Broker. The Broker then persists messages to CommitLog, either synchronously or asynchronously.

Messages sent by the Producer are not lost as long as the messages are flushed and persisted to a CommitLog file. Because of this, consumers certainly have a chance to consume this message. If a pull request fails to pull a message, it can wait for the next pull. The server also supports long polling mode. If a pull request fails to pull a message, the Broker allows 30 seconds to wait.

Here, RocketMQ uses a broker-side backend service thread, ReputMessageService, to continuously distribute requests and asynchronously build ConsumeQueue and IndexFile data.

21. How does RocketMQ read and write files?

RocketMQ reads and writes files cleverly by taking advantage of some of the operating system’s most efficient file reads and writes – PageCache, sequential reads and writes, zero copy.

  • PageCache, sequential reads

In RocketMQ, the ConsumeQueue logical consumption Queue stores less data and is read sequentially. Under the prefetch of the Page cache mechanism, ConsumeQueue file read performance is almost close to read memory, even in the case of message heap. However, for log data files stored by CommitLog messages, many random accesses are generated when message content is read, which seriously affects the performance. If you select an appropriate SYSTEM I/O scheduling algorithm, for example, set the scheduling algorithm to Deadline (if block storage uses SSDS), the random read performance will also be improved.

The PageCache is the OS’s cache of files to speed up reading and writing to files. Generally speaking, the sequential read and write speed of the program is almost close to the read and write speed of the memory. The main reason is that the OS uses the PageCache mechanism to optimize the performance of the read and write access operation, and uses part of the memory for PageCache. The OS first writes data to the Cache and then asynchronously flusits the data from the Cache to physical disks by the PDFlush kernel thread. If PageCache is not matched during a file reading, the OS prereads data files of other adjacent blocks in sequence when the file is accessed from the physical disk.

  • Zero copy

In addition, RocketMQ reads and writes files primarily through MappedByteBuffer. Among them, the use of NIO FileChannel model of the physical file on the disk map directly to the user mode memory address (the Mmap way to reduce the traditional IO, the disk file data in the operating system kernel address space buffer, and user application address space buffer will be copied back and forth between the performance of the overhead). Converting operations on files to direct operations on memory addresses greatly improves the efficiency of reading and writing files. (Because of the memory-mapping mechanism, RocketMQ uses a fixed-length structure for file storage, which allows the entire file to be mapped to memory at once.)

What is zero copy?

In the traditional way, the data goes through several copies and user/kernel switching.

  1. Copy data from disk to kernel-mode memory;
  2. Copy from kernel-mode memory to user-mode memory;
  3. Then copy from user – mode memory to network – driven kernel – mode memory;
  4. Finally, it is copied from the network-driven kernel memory to the nic for transmission.

Therefore, zero-copy mode can reduce the number of context switches and memory copies between the user mode and kernel mode, improving I/O performance. A more common implementation of zero copy is mmap, which is implemented in Java via MappedByteBuffer.

22. How to realize message flushing?

RocketMQ offers two flush strategies: synchronous and asynchronous

  • Synchronous flush: After a message reaches the Broker’s memory, it must be flushed to a commitLog file for success, and the Producer is returned with the data sent successfully.
  • Asynchronous flush: An asynchronous flush means that after the message reaches Broker memory, the Producer returns the message that the data has been sent successfully and wakes up a thread to persist the data to a CommitLog file.

The Broker operates directly on memory (memory-mapped files) during message access. This provides throughput for the system, but does not prevent data loss in the event of a machine power failure, so it needs to be persisted to disk.

The ultimate implementation of a flush is to write mapped data to disk using mappedbyteBuffer.force () in NIO. In synchronous flush cases, the Broker waits for the write to complete after writing messages to the CommitLog mapped area.

For asynchrony, only the corresponding thread is woken up without any guarantee of execution timing, as shown in the figure.

22. Can you tell me how RocketMQ load balancing is implemented?

The load balancing in RocketMQ is done on the Client side. Specifically, it can be divided into the load balancing for sending messages from the Producer side and the load balancing for subscribing messages from the Consumer side.

Producer Performs load balancing

When sending a message, the Producer will first find the TopicPublishInfo specified by Topic. After obtaining the TopicPublishInfo route information, The RocketMQ client sends messages by default using the selectOneMessageQueue() method, which selects a messageQueueList from TopicPublishInfo. There is a sendLatencyFaultEnable switch variable that, if enabled, filters out brokers that are not available based on random incremental modulo.

Outside the chain picture archiving failure, the source station might be hotlinking prevention mechanism, proposed to directly upload picture preserved (img – VuMGUv6B – 1649246218316) (cdn.jsdelivr.net/gh/fighter3…)”

A “latencyFaultTolerance” is a fixed amount of time to back off from previous failures. For example, if the latency of the last request exceeds 550Lms, back away from 3000Lms; More than 1000L, retreat 60000L; If it is off, a queue (MessageQueue) is selected to send messages by means of random incremental modulo. LatencyFaultTolerance is the key to achieve high availability of message sending.

Load balancing for Consumer

In RocketMQ, the two Consumer consumption modes (Push/Pull) are based on the Pull mode for retrieving messages. The Push mode is an encapsulation of the Pull mode. Essentially, the Pull thread pulls a batch of messages from the server and submits them to the message consuming thread pool. And “non-stop” continue to try to pull messages to the server again. If the message is not pulled, the pull is delayed and continues. In both Push/Pull consumption modes, the Consumer side needs to know which message queue on the Broker side to fetch messages from. Therefore, it is necessary to do load balancing on the Consumer side, that is, to allocate multiple MessageQueue on the Broker side to which consumers in the same ConsumerGroup consume.

  1. The Consumer sends heartbeat packets

After Consumer is started, it continuously sends heartbeat packets (containing information such as message consumption group name, subscription collection, message communication mode, and value of client ID) to all Broker instances in the RocketMQ cluster via a scheduled task. The Broker receives a Consumer heartbeat message and maintains it in the Local buffer variable “consumerTable” of ConsumerManager. The encapsulated client network channel information is also stored in the local buffer variable “channelInfoTable”. Provide metadata information for Consumer side load balancing.

  1. The Consumer side implements the core load balancing class – RebalanceImpl

    Starting the MQClientInstance instance in the Consumer instance startup process completes the start of the load balancing service thread – RebalanceService (every 20 seconds).

    A review of the source code shows that the Run () method of the RebalanceService thread ends up calling the rebalanceByTopic() method of the RebalanceImpl class, which is the core of the Consumer side load balancing.

    The rebalanceByTopic() method does different logic depending on whether the consumer communication type is “broadcast mode” or “cluster mode.” Here are the main processing flows in cluster mode:

(1) Obtain the message consumption queue set (mqSet) under this Topic from topicSubscribeInfoTable, the local cache variable of rebalanceImpl instance;

(2) according to the topic and consumerGroup calls for a parameter mQClientFactory. FindConsumerIdList () method sends a communication request to the Broker end to get the consumption group consumer Id list;

(3) First sort the message consumption queue and consumer Id under Topic, and then use the message queue allocation strategy algorithm (default: message queue average allocation algorithm) to calculate the message queue to be pulled. The average allocation algorithm here, similar to the paging algorithm, sorts all MessageQueue in order similar to the record, sorts all consumers in order similar to the page number, and calculates the average size that each page needs to contain and the range of records on each page. Finally, the entire range is traversed to calculate the MessageQueue that should be assigned to the current Consumer.

(4) then call updateProcessQueueTableInRebalance () method, the specific approach is to first set will be assigned to the message queue (mqSet) and processQueueTable do a filtering ratio.

  • The processQueueTable annotation in red in the figure above indicates that it is not included in the allocated message queue set mqSet. Set the queue Dropped attribute is true, then look at whether the queue can remove processQueueTable cache variable, specific executive removeUnnecessaryMessageQueue here () method, Check every 1s to see if the lock on the current consumption-processing queue can be obtained. Return true if the lock is obtained. Return false if the lock on the current consumer processing queue is still not available after 1s of waiting. If true is returned, the corresponding Entry is removed from the processQueueTable cache variable;
  • The green part of processQueueTable in the figure above represents the intersection with the allocated message queue set mqSet. To judge whether the ProcessQueue has expired, no need to worry about in Pull mode, if it is a Push model, set up the Dropped attribute is true, and invoke removeUnnecessaryMessageQueue () method, like the above try to remove the Entry;
  • In the end, Create a ProcessQueue object for each MessageQueue in the filtered MessageQueue set (mqSet) and store it in the processQueueTable queue of the RebalanceImpl (where the computePul of the RebalanceImpl instance is called) The lFromWhere(MessageQueue MQ) method gets the next progress consumption value of the MessageQueue object, offset, and fills it into the pullRequest object properties to be created next. Add the pullRequest object to the pullRequestList and execute the dispatchPullRequest() method. The PullRequest object of the Pull message is put into the pullRequestQueue of the PullMessageService thread. After the thread is pulled out, it sends a Pull request to the Broker. RebalancePushImpl and RebalancePullImpl have different dispatchPullRequest() methods, which are empty in RebalancePullImpl.

Message consumption queue load balancing among different consumers in the same consumer group, its core design concept is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.

23.RocketMQ message long polling?

A long poll is when the Consumer pulls a message from the Queue. If there is no data in the Queue, the Broker will not return immediately. Instead, it will PullReuqest and wait for the Queue to receive a message or for the long poll to block. All pullRequests on the queue are reprocessed.

  • PullMessageProcessor#processRequest

                    // If no data is pulled
                    case ResponseCode.PULL_NOT_FOUND:
                        Suspend is enabled for both broker and consumer, and is enabled by default
                        if (brokerAllowSuspend && hasSuspendFlag) {
                            long pollingTimeMills = suspendTimeoutMillisLong;
                            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                            }
    
                            String topic = requestHeader.getTopic();
                            long offset = requestHeader.getQueueOffset();
                            int queueId = requestHeader.getQueueId();
                            Encapsulate a PullRequest
                            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                            // Hang the PullRequest
                            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                            response = null;
                            break;
                        }
    Copy the code

Pending requests, a server thread constantly checks to see if there is any data in the queue, or times out.

  • PullRequestHoldService#run()
    @Override
    public void run(a) {
        log.info("{} service started".this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                // Check the hold request
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime); }}catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end".this.getServiceName());
    }
Copy the code



Reference:

[1]. RocketMQ Field and Principles

[2]. Inside RocketMQ Technology

[3] When the interview was asked about RocketMq, I was stunned

[4] ai Xiaoxian, I Want to Enter dachang

[5]. Dreamcat. Ink/Java – interv…

[6]. Shallow In shallow Out -RocketMQ

[7]. Twelve graphs, kicking down the message queue door

[8] aren’t you curious about all that mq crap?

[9]. How to solve message idempotent (deduplication)? Look at this solution!

[10]. 70,000 words, 151 pictures, overnight collation message queue core knowledge summary! Master MQ completely this time!

[11]. Geek Time message Queue Master Class

[12]. RocketMQ’s official website


Article first public number: three points evil