Due to the length, the source code analysis this time is limited to the core logic of sending messages from the Producer side. I will explain the source code through flow charts, code notes and text explanation. I will write several articles to analyze the source code in the future.

This blog is about RocketMQ. The main points are the use of RocketMQ features, the underlying operating principles of RocketMQ and the source analysis of some of the core logic. I’ve covered why we’re using MQ, what it can do for us, what MQ implementations are available in the community, and the pros and cons of each MQ in the community in my previous article, Message Queue Miscellany, so you can go back if you need to.

Basic concept

Broker

The first thing we need to know is what we experienced with RocketMQ. The producer sends a message to RocketMQ, RocketMQ takes the message and stores it persistently, and the consumer goes to MQ to consume the message.

In the figure above, RocketMQ is identified as a single point, but this is certainly not the case. For a service that can scale horizontally at any time, the number of messages that producers produce to MQ changes accordingly, so a mature MQ must be able to handle this situation; And MQ itself needs to be highly available, otherwise all messages stored in MQ will be lost and irretrievable once this single point of failure occurs.

So in a real production environment, an MQ cluster would definitely be deployed. In RocketMQ, this “instance” has its own name, called Broker. In addition, each Broker deploys a Slave Broker. The Master Broker periodically synchronizes data to the Slave Broker to form a Master/Slave architecture for the Broker.

The problem is that in the microservice architecture, the deployed services also have multi-instance deployment, and the services call each other through the registry to get the list of instances of the corresponding services.

Take Spring Cloud as an example. The service obtains all instances of a service from the Eureka registry, and then delivers them to the Ribbon. The Ribbon associates with Eureka, obtains the list of service instances from Eureka, selects one instance through the load balancing algorithm, and finally initiates a request.

Similarly, there are multiple Broker instances in MQ. How does a producer know how many Broker instances there are in an MQ cluster? Which instance should I connect to?

First of all, we directly excluded Hard Code from the Code. I don’t think we need to elaborate on the specific reasons. How does RocketMQ solve this problem? That’s where NameServer comes in.

NameServer

A NameServer can be simply referred to as the registry mentioned in the previous section. All brokers register with the NameServer to report their information when they are started. This information, in addition to Broker IP, port-related data, and routing information for the RocketMQ cluster, will be discussed later.

With NameServer, the client starts up and interacts with NameServer to retrieve all Broker information and routing information in the current RocketMQ cluster. In this way, the producer knows the Broker information he needs to connect to and can deliver the message.

How does NameServer handle a Broker failure during runtime?

This brings up RocketMQ’s renewal and fault awareness mechanisms. After registering with NameServer, the Broker sends heartbeat messages to NameServer every 30 seconds. If NameServer senses that a Broker has not sent a heartbeat for more than 120 seconds, it considers the Broker unavailable and removes it from the messages it maintains.

This mechanism is similar to the implementation of Eureka in Spring Cloud. Services in Eureka also register themselves with Eureka after they are started, so that other services can make requests to the Service and exchange data. The Service sends heartbeat renewal messages to Eureka every 30 seconds. If a Service does not send heartbeat messages for more than 90 seconds, Eureka considers that the Service is down and removes it from the registry maintained by Eureka.

In the figure above I talked about multi-instance deployment, which is not quite the same as multi-instance deployment in microservices, where all services are stateless and can be scaled horizontally, whereas in RocketMQ, each Broker may hold different data.

Let’s look at a simple use of RocketMQ.

Message msg = new Message(
  "TopicTest"."TagA",
  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
Copy the code

As you can see, the first argument to Message specifies a Topic for the current Message. What is Topic?

Topic

Topic is a logical classification of messages sent to RocketMQ. For example, our order system, points system, and warehousing system all use THIS MQ. To distinguish between these, we can create different topics for different systems.

So why logical partitions? Since RocketMQ does not store the data of a single Topic per Broker in real storage, it is simple enough that if the Broker fails, or even if the disk fails in extreme cases, the data of that Topic will be permanently lost.

So in real storage, messages are distributed across multiple brokers. This storage medium is called MessageQueue. If you are familiar with Kafka’s underlying principles, you will know that this is the same implementation as Partition in Kafka.

As can be seen from the figure above, the data of the same Topic is divided into several parts, which are stored on different brokers. Why is RocketMQ implemented in this way?

First, if there is only one Queue in a Topic, the speed of consumption will be affected. On the other hand, if a Topic has many queues, then the Consumer can do the consuming at the same time, thus supporting more concurrency.

In addition, the resources of a single machine are limited. The volume of messages on a Topic can be so large that a machine’s disk can quickly fill up. So RocketMQ divides the data of a Topic among multiple machines for distributed storage. It is essentially a mechanism for data fragmentation.

So we know that data sent to a Topic is distributed across MessageQueue across multiple brokers.

Broker message storage principles

How are messages sent from the Producer to the Broker actually stored? The Broker receives a message and writes the message sequentially to a Commit Log file on disk. Each Commit Log is 1 GB in size. If 1 GB is filled, a new Commit Log is created to continue writing. Commit Log files are written sequentially and read randomly.

This is the lowest level of storage, so how does the Broker find the data from the Commit Log when the Consumer is retrieving the message? One of RocketMQ’s defining features is its high throughput, which seems paradoxical. How does RocketMQ do this?

The ConsumeQueue will Hash the message Offset, message Size, and Tag in the Commit Log to the ConsumeQueue. Each Message Queue has a corresponding ConsumeQueue file stored on disk.

As with the Commit Log, a ConsumeQueue contains 30W messages, each of which is 20 bytes, so the size of each ConsumeQueue file is approximately 5.72 MB; When it is full, a new ConsumeQueue file is created to continue writing.

ConsumeQueue is a logical queue and an index that allows consumers to quickly locate the message from a disk file when consuming it.

You might be thinking, what is this Tag?

Tag

Tags are used to classify messages within the same Topic.

For an extreme example, a new service, need to consume the order system of MQ, but because of the particularity of the business, just need to order message types of Consumer goods for digital products, if there is no Tag, then the Consumer will determine to do, the order is a digital product class news, if not, is discarded, if it is to consume.

As a result, the Consumer side performs a lot of useless work. With the introduction of tags, producers can Tag orders when producing messages, and consumers can only consume messages with a specified Tag. So instead of the Consumer having to do the filtering themselves, RocketMQ will do it for us.

What is the principle of filtering? The filter is first performed on the Broker side by the Hash value of the Tag stored in the message, and then again on the Consumer side when pulling the message.

Why do you need to filter on the Consumer side after filtering on the Broker? Because of Hash conflicts, different tags may get the same value through the Hash algorithm, so the Consumer side will perform secondary filtering through strings when pulling messages.

Producer sends message source code analysis

Process overview

First give the general process of sending messages, familiar with the process to see the source code, will be more clear.

Initialize Prodcuer

Let’s start with the following example.

First we initialize a DefaultMQProducer, and RocketMQ gives that Producer a default implementation DefaultMQProducerImpl. Producer.start () then starts a thread pool.

Validity check

Then there is the core producer.send(MSG). First, RocketMQ calls checkMessage to check whether the sent message is valid.

These tests include whether the message to be sent is empty, whether the Topic is empty, whether the Topic contains illegal strings, whether the length of the Topic exceeds the maximum 127, and then check whether the Body meets the sending requirements. For example, whether the Body of MSG is empty, whether the Body of MSG exceeds the maximum limit, and so on, the maximum Body of the message cannot exceed 4M.

Call send message

For MSG’s Topic, RocketMQ wraps it with a NameSpace and then calls the default sendDefaultImpl implementation in DefaultMQProducerImpl to send a message to the Broker with a default Timeout of 3 seconds.

When sending a message, MQ calls checkMessage again to check the validity of the message, and then attempts to retrieve the Topic details.

All Topic information is stored in a ConcurrentHashMap called topicPublishInfoTable, where the Key is the Topic string and the Value is TopicPublishInfo.

This TopicPublishInfo contains the corresponding metadata retrieved from the Broker mentioned earlier in the basic concept, which contains the key MessageQueue and cluster metadata, with the following basic structure.

MessageQueueList contains all messagequeues of this Topic, the Topic to which each MessageQueue belongs, the name of the Broker to which each MessageQueue belongs, and the queueId of each MessageQueue.

TopicRouteData contains all Queue and Broker data for this Topic.

Get Topic details

Before finally sending messages, need to get to the details of the Topic, such as data, such as the Broker address is obtained through tryToFindTopicPublishInfo Producer, detailed notes I have written in the chart.

For first-time topics, the Map above will definitely not exist. So RocketMQ will add to the Map, and the method is called updateTopicRouteInfoFromNameServer from NameServer obtain the Topic of metadata, will it and write a Map. In addition to the initial session, routing information and Broker details are placed in topicRouteTable and brokerAddrTable, respectively, which are conCurrenthashMaps maintained in memory by Producer.

After obtaining the Topic details, the next step is to confirm the number of retries for a send, timesTotal. Assuming timesTotal is N, sending the message will be retried N times if it fails. However, retries are performed if and only if the send fails, but not in any other case, such as timeout, or the appropriate MessageQueue is not selected.

The number of retries timesTotal is affected by the communicationMode argument; CommunicationMode has three values, SYNC, ASYNC, and ONEWAY. In RocketMQ’s default implementation, SYNC is selected.

If communicationMode is SYNC, timesTotal is 1+retryTimesWhenSendFailed, which is 2 by default. Represents the number of retries after the message failed to be sent.

Thus, if we use SYNC, Producer sends messages with 3 retries by default. However, retries are performed if and only if the sending fails, not in any other case.

MessageQueue Select mechanism

As we have talked before, the data of a Topic is fragmented and stored on one or more brokers, and the underlying storage medium is MessageQueue. In the previous figure, we did not show how the Producer chooses which MessageQueue to send to. Here we have a look at the source code.

In Producer, MessageQueue selection is carried out through selectOneMessageQueue, which determines the next selection based on the detailed metadata of the Topic and the Broker of the last selected MessageQueue.

Core selection logic

What is the core selection logic? In plain English, pick an index and modulo it with the number of MessageQueue in the current Topic. This index is definitely not there the first time it is selected, RocketMQ will do a random number. And then I’m going to add +1 to that value, because for general purpose, in the outer view, this index was already used last time, so you just add +1 every time.

! Selecting mechanism [core] (/ Users/hulunhao/Library/Application Support/typora – user – images/image – 20210226094707052. The PNG)

MessageQueue above is the core, lowest level principle mechanism. But because the actual business situation is very complex, RocketMQ does a lot of extra work in the implementation.

Send selection logic under fault delay

In the actual selection process, it will determine whether the sending delay fault is currently enabled, which is determined by the value of the variable sendLatencyFaultEnable. Its default value is false, that is, it is not enabled by default. I cannot find its enabled position in the code.

But we can talk about what happens when you turn it on. It will also open the for loop, counting the number of MessageQueue, and then determining whether the Broker is currently available through faultItemTable, an in-memory table that updates it every time a message is sent.

If no Broker is currently available, its last-ditch logic is triggered and a MessageQueue is selected.

Regular selection logic

If the current send fault delay is not enabled, it will follow the conventional logic and also go to the for loop for calculation. After the loop gets the MessageQueue, it will determine whether it belongs to the same Broker as the one selected last time. If so, it will select again. Until MessageQueue that does not belong to the same Broker is selected, or until the end of the loop. This is also to distribute messages evenly and prevent data skew.

Message is sent

Finally, the Netty-related component is called and the message is sent.

EOF

RocketMQ: The source code of RocketMQ messages sent by Producer RocketMQ: The source code of RocketMQ messages sent by Producer RocketMQ

That’s all for this blog post. If you found it helpful, please give it a thumbs up, a comment, a share and a comment.

Welcome to wechat search to follow [SH full stack Notes] and check out more related articles