preface

Ahem… RocketMQ source code analysis first ๐ŸŽฌ Action!!

The first time I used RocketMQ I had the idea to explore the source code. It’s not that RocketMQ has a unique advantage over other MQ, but it’s worth reading up on for a few simple reasons

  • Java stack-based middleware

    As an open source product of Ali, most of them are mainly Java. So it’s still relatively easy to read for Java developers. Of course, now also support C/C++, Python, Go multiple language versions

  • Helps improve your code style and skills

    Boring and lengthy code to write more, programming style and skills will inevitably have bottlenecks, through reading the source code can learn more in-depth technical points and excellent design patterns.

In the future, it’s safe to write “Mastered RocketMQ” on your resume, no matter how tough the questions are ๐Ÿ˜Ž๐Ÿ˜Ž๐Ÿ˜Ž ~ ha ha ha

The body of the

Of course, if the beginning of a head into the source, estimated to see a month is also confused, did not see why. So this is why the fat trench will first write this open chapter of the source code analysis ๐Ÿคจ.

Let’s take a look at the overall architectural design of RocketMQ

Introduce some of the main concepts above

Producer

Message producers are responsible for producing messages, which are typically produced by business systems. A message producer sends messages generated in the business application to the Broker server. RocketMQ provides multiple delivery modes: synchronous, asynchronous, sequential, and unidirectional. Both synchronous and asynchronous require the Broker to return an acknowledgement message, but one-way does not.

Consumer

Responsible for consuming messages, typically the backend system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user application, it provides two forms of consumption: pull consumption and push consumption.

NameServer

The name service acts as a provider of routed messages. The producer or consumer can use the name service to find the corresponding list of Broker IP addresses for each topic. Multiple Namesrver instances form a cluster, but are independent of each other and do not exchange information.

BrokerServer

A message transfer role that stores and forwards messages. The proxy server in the RocketMQ system is responsible for receiving and storing messages sent from producers and preparing consumers for pull requests. The proxy server also stores message-related metadata, including consumer groups, consumption progress offsets, and topic and queue messages.

Message

The physical carrier of information transmitted by a message system, the smallest unit that produces and consumes data, and each message must belong to a topic. Each Message in RocketMQ has a unique Message ID and can carry a Key with a business identity. The system supports Message query by Message ID and Key.

Topic

Represents a collection of a class of messages, each containing several messages, each belonging to only one topic, and is the basic unit of Message subscription for RocketMQ.

Tag

Flags set for messages to distinguish different types of messages under the same topic. It can be understood as the secondary classification of Topic. Messages from the same business unit can be tagged differently under the same topic for different business purposes. Tags are an effective way to maintain clarity and consistency in your code and optimize the query system RocketMQ provides. Consumers can implement different consumption logic for different subtopics based on the Tag for better scalability.

Consumption patterns

There are two patterns of news consumption: Clustering and Broadcasting.

The default is the Clustering model, in which a message is consumed by only one consumer by subscribing to a topic within the same consumer cluster.

Broadcasting mode messages are sent to each consumer in the consumer group for consumption.

Consumption patterns

There are two types of consumption on the Consumer side: Pull consumption and Push consumption

Pull consumption: Consumer’s pull message methods are actively invoked to pull messages from the Broker server, with the application controlling the initiative. Once the batch messages are retrieved, the application initiates the consumption process.

Driven consumption: In this mode, the Broker will actively push the data to the consumer after receiving it. This consumption mode is generally in high real-time. (In fact, the Push mode is just an encapsulation of the Pull mode. Its essence is that the message Pull thread tries to Pull messages from the server again after it pulls a batch of messages from the server and submits them to the message consuming thread pool.)

Message type

There are two types of messages: Normal Message and Ordered Message

In the normal sequential consumption mode, the messages received by consumers in the same consumption queue are sequential, while messages received by different message queues may be non-sequential.

In strict sequential message mode, all messages received by consumers are sequential.

Above a fierce such as tiger of professional terminology introduction, do not know you will have a bit meng, anyway fat trench is now a face meng forced ๐Ÿ˜ฐ


Talk is cheap, show me the code.

Github address: github.com/apache/rock…

Enter git clone….. Then you can go have a cup of tea and a lie-in

Source code download well, we first look at the project directory structure ๐Ÿ‘‡

Introduce some of the core modules

  • Namesrv: Named discovery service, broker service management and routing
  • broker: Core component, receiveproducerSend messages and store messages withconsumerMessage consumption
  • client: client implementation,producerandconsumerImplementation module of
  • Store: storage layer implementation, message persistence, index service, high availability HA service implementation
  • Remoting: Communication layer implementation, based on Netty bottom layer encapsulation, communication between services rely on this module
  • filter: message filtering service, equivalent to thebrokerandconsumerA filter agent is added in the middle
  • Common: function classes, methods, configuration files, constants, etc
  • Tools: provides command management functions, such as message query and topic management
  • Example: Official provided examples of typical functions such as order Message, push consumer, and pull consumer

PS: the above several core modules, behind the fat trench will be divided into multiple chapters from the source in-depth detailed explanation of ๐Ÿ’โ™‚๏ธ


Since this is the beginning of RocketMQ, we have to understand all parts of 7788

With the architecture of RocketMQ outlined above, let’s take a look at the specific interaction flow between the roles

Emmmm… The diagram above may be a little complicated, but I’m sure those of you who are online should be able to get a sense of it.

Let’s just focus on those three concepts

  1. CommitLog:The body structure of the message storeIn short, it is storageProducerSent message. Remember that all messages need to be dropped, so they need to be written to a file. Each file defaults to 1 GB (why default 1 GB, you can think about it). When the file is full, write to the next file.
  2. ConsumeQueue:Message consumption queuetopic-basedcommitlogIndex file. On the TopicQueueAnd consumerConsumeQueueFor example, Topic has 1,2, 3 and 4 queues, and consumer A is assigned to 1,2 queues (which involves consumer load balancing), then consumer A’sConsumerQueueThe two that correspond to Topic1 and 2queue. The introduction ofConsumeQueuePrimarily to improve message consumption performance, it is storedStart Physical offset offset.Message size ๅ’ŒThe HashCode value of the message Tag.
  3. IndexFile:Index fileCommitLog index set file. Fixed singleIndexFileThe file size is about 400M, oneIndexFileCan hold 2000W indexes,IndexFileThe underlying storage of RocketMQ is designed to implement the HashMap structure in the file system, so the underlying implementation of the index file of RocketMQ is a Hash index.

Page caching and memory mapping

PageCache: file system cache to speed up file reads and writes. We all know that disk IO and memory IO are orders of magnitude different in speed. The purpose of the page cache is to make sequential reads and writes to files close to memory speed. To put it simply, the OS writes data to the Cache and then asynchronously flusits the data from the Cache to the physical disk by the PDFlush kernel thread. If PageCache is not matched during a file reading, the OS prereads data files of other adjacent blocks in sequence when the file is accessed from the physical disk.

MMAP (memory mapping) : Maps a file or other object to the address space of a process to realize the mapping between the file disk address and a virtual address in the virtual address space of a process. In simple terms, it is to realize the direct transfer of disk files to virtual memory, reducing the copy of data from kernel-mode to user-mode.

Another point to make clear here is that the mmap technology for file mapping is generally limited in size, between 1.5GB and 2GB. That’s why RocketMQ places CommitLog files at 1GB and ConsumeQueue files at 5.72MB, which is not too big.

PS: Maybe some friends feel that after reading these two concepts are not very clear [dog head]. In fact, after looking at the beginning of fat hao also feel confused, after consulting all kinds of information to understand why. PageCache, MMAP this piece of knowledge, behind the fat trench will specifically collate a shared article drop ๐Ÿ˜๐Ÿ˜๐Ÿ˜

Message to brush plate

  1. Synchronous brush set: RocketMQ’s only after the message is actually persisted to disk, as shown in the figure aboveBrokerThe Producer actually returns a successful ACK response to the Producer. Synchronous flush is a good guarantee for THE reliability of MQ messages, but has a significant impact on performance. It is generally applicable to financial services.
  2. Asynchronous brush set: Can take full advantage of OSPageCacheAdvantage as long as the message is writtenPageCacheThe successful ACK can be returned to the Producer. Flush messages are committed by background asynchronous threads, which reduces read and write latency and improves MQ performance and throughput.

The message to repeat

The message domain has a QoS definition for message delivery, which is divided into:

  • At least once
  • At most once
  • Exactly once

QoS: Quality of Service

Almost all MQ products claim to be At least once.

Since it is at least once, there is no way to avoid message duplication, especially in distributed network environment.

To quote RocketMQ co-founder:

This defect can also be regarded as part of TCP protocol after all, such as failed retransmission. Businesses are often sensitive to duplicate messages, and the current version of RocketMQ does not support rescheduling, and we generally recommend that users do their own rescheduling via external global storage. In the next generation of feature planning, we will build in solutions. Let’s start with common industry practices like Artemis, IronMQ, etc., by storing weights globally on the server side. This is an IO sensitive operation that imposes some load on the server. RocketMQ, on the other hand, aims to reduce server IO by implementing a secondary rejudgment strategy.

In this respect RocketMQ may be slightly inferior to Kafka, which supports all three modes.

Transaction message

Apache RocketMQ already supports distributed transaction messages in version 4.3.0. RocketMQ uses the 2PC approach to commit transaction messages and adds compensation logic to handle two-phase timeout or failure messages, as shown in the figure below.

Let’s focus on that

Half Message

When the Producer sends a message to the Broker, the message cannot be consumed by the Consumer. Instead, the Producer must confirm the message twice before consuming it.

Message back to check

Due to network jitter or Producer restart, Producer did not confirm the Half Message twice. The Broker checks the status of the undetermined message and sends the message to the respective Producer (the Producer of the same Group). The Producer checks the status of the local transaction based on the message, and then performs Commit or Rollback. It is important to note that RocketMqQ does not perform an endless backcheck of the transaction status of the message. By default, the message is backchecked 15 times. If the transaction status is not known after 15 backchecks, the message is rolled back by default.

Back in the consumption

Backdating consumption is when a Consumer has successfully consumed a message. To support this function, the message needs to be retained after the Broker has delivered a successful message to the Consumer because of business requirements for re-consumption. Reconsumption is usually done in time. For example, if the Consumer system fails and needs to be reconsumed one hour ago, the Broker should provide a mechanism to reverse the consumption progress in time. RocketMQ supports backtracking of consumption in time, down to the millisecond.

Timing of the message

Timed messages (deferred queues) are messages that are sent to the broker but are not consumed immediately and wait for a specific time to be delivered to the real Topic. The default value of the broker is 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h. The default value of the broker is 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h. You can configure a custom messageDelayLevel. Note that messageDelayLevel is a property of the broker and does not belong to a topic. To send messages, set the delayLevel: msg.setdelaylevel (level). Level has the following three situations:

  • If level == 0, the message is non-delayed
  • 1<=level<=maxLevel, the message is delayed for a specific time, for example, level==1, the delay is 1s
  • If level > maxLevel, level== maxLevel, for example, level==20, the delay is 2h

Timed messages are temporarily stored in a topic named SCHEDULE_TOPIC_XXXX and stored in a queue according to delayTimeLevel. QueueId = delayTimeLevel -1, That is, a queue contains only messages with the same delay, ensuring that messages with the same delay can be consumed sequentially. The broker consumes SCHEDULE_TOPIC_XXXX schedulingly, writing the message to the actual topic.

Note that timed messages are counted on the first write and scheduled writes to real topics, so the number of sent messages and TPS will be higher.

Message retry

If the Comsumer Consumer fails to consume the message, the Broker considers it A failure to consume the message. By default, the Comsumer Consumer tries 16 times to consume the message.

RocketMQ saves RETRY messages to the Delay queue of Topic SCHEDULE_TOPIC_XXXX. The background scheduled tasks are delayed and then saved to the RETRY queue of %RETRY%+consumerGroup. And the more retries, the greater the delivery delay.

This is how we deal with message consumption failure

Dead-letter queue

After the maximum number of retries is reached (the default is 16), if consumption still fails, the message is posted to the DLQ dead-letter queue of Topic name %DLQ%+consumerGroup, which processes messages that cannot be consumed properly.

RocketMQ calls these messages, which normally cannot be consumed, dead-letter messages, and the special queues that store them dead-letter queues. In RocketMQ, consumer instances can be consumed again by resending messages in a dead-letter queue using the Console.

conclusion

Open chapter does not explain a lot of source code, are some of the more boring concepts and knowledge points. But without the understanding of these basic concepts, it is estimated that behind the in-depth source code analysis of the chapter, we may be at any time a face meng forced ๐Ÿ˜…

Most of the RocketMQ information should be summarized, I believe you have a preliminary understanding of this (in fact, a lot of reference to the source documentation).

PS: Fei Hao is also just starting to write a blog, as a cute new writer may be a little weak writing efficiency. Like this article fat is also the use of the time to go off work liver out, after writing many times to repair, probably spent more than a week. But I feel that since I have decided to do something, I must do my best, which is not only responsible for everyone, but also for myself. The back of the in-depth source code will also stick to it, may be updated for a long time, but fat trench guarantee must be high quality good article.

In addition, since it is a cute new writer, there may be a lot of terms or sentences in the expression of a little obscure or defective, I hope everyone can be considerate. Of course, if what technical points are wrong or controversial, but also hope that everyone can timely put forward, welcome to comment ๐Ÿ“ซ~

Ordinary change, will change ordinary

I am a house xiaonian, a low-key young man in the Internet

Please visit my blog at ๐Ÿ“– edisonz.cn for more shared articles