1. Basic architecture

RocketMQ architecture is divided into four main parts, as shown in the following figure:

  • Producer: a role that publishes messages and supports distributed cluster deployment. The Producer uses MQ’s load balancing module to select the corresponding Broker cluster queue for message delivery, which supports fast failure and low latency.

  • Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. At the same time, it also supports the consumption of cluster mode and broadcast mode. It provides real-time message subscription mechanism, which can meet the needs of most users.

  • NameServer: NameServer is a very simple Topic routing registry that acts like ZooKeeper in Dubbo and supports dynamic registration and discovery of brokers. It mainly includes two functions:

    • BrokerManagement,NameServeracceptBrokerThe registration information of the cluster is saved as the basic data of routing information. Then provide a heartbeat detection mechanism to checkBrokerWhether they are still alive;
    • Routing information management, eachNameServerWill save information aboutBrokerThe entire routing information of the cluster and the queue information for the client to query. thenProducerandConumserthroughNameServerYou know the whole thingBrokerRouting information for the cluster for message delivery and consumption.

    NameServer is also typically deployed in clusters, where instances do not communicate with each other. The Broker registers its routing information with each NameServer, so each NameServer instance has a complete routing information stored on it. When a NameServer goes offline for some reason, the Broker can still synchronize its routing information with other Nameservers. Producers and consumers can still dynamically perceive the routing information of the Broker.

  • BrokerServer: The Broker is responsible for storing, Posting, and querying messages and ensuring high availability of services. To enable these functions, the Broker contains several important sub-modules:

    • Remoting Module: the wholeBrokerThe entity responsible for processing fromclientsEnd request.
      • Client Manager: Responsible for managing the client (Producer/Consumer), and maintenanceConsumertheTopicSubscription information
      • Store Service: Provides simple AND convenient APIS for storing messages to physical disks and querying messages.
      • HA Service: The high availability service is providedMaster BrokerSlave BrokerData synchronization function between.
      • Index Service: according to specificMessage keyDelivery to theBrokerTo provide quick query of messages.

2. Obtain the source code

The Github repository for the rocketMq project is github.com/apache/rock… For network reasons, we will not use the Github repository directly, but import it to Gitee. When Gitee creates a new repository, choose to import the existing repository:

After importing to Gitee, you can checkout. The gitee repository for this article is gitee.com/funcy/rocke… .

I used to create my own branch based on the tag, and then analyze it on my branch. The rocketMq tag is as follows:

With the latest version 4.8.0, we will create a new branch based on this tag using the following command:

# Switch to Rocketmq-all-4.8.0Git checkout rocketmq - all - 4.8.0# Create your own analysis based on Rocketmq-all-4.8.0 named Rocketmq-all-4.8.0-learnGit checkout - b rocketmq - all - 4.8.0 - LEARN# push rocketmq-all-4.8.0-learn branch to remote repositoryGit push -u Origin Rocketmq-all-4.8.0-learnCopy the code

Next, we did all of our operations on the RocketMq-all-4.8.0-learn branch.

3. Local boot

Once we get the code, we start local boot, yes, in IDEA.

3.1 copyconfdirectory

Before starting the project, we need to do some configuration. The rocketMq project configuration file is located in the conf directory of the rocketMq/Distribution module and is directly copied to the rocketMq directory in its entirety:

There is no need to change, copy out of the line, the content of these configurations after the analysis of the source code to explain it.

3.2 startnameServer

NameServer master class for org. Apache. Rocketmq. Namesrv. NamesrvStartup:

If we run the main() method directly, we get an error:

We need to configure the ROCKETMQ_HOME directory. We can configure the ROCKETMQ_HOME directory in idea:

Open the configuration screen:

Fill in the ROCKETMQ_HOME configuration:

Here I fill in is ROCKETMQ_HOME = / Users/chengyan IdeaProjects/myproject/rocketmq, this ROCKETMQ_HOME directory path is the conf folder.

Once filled in, you can start:

3.3 startbroker

Broker of the main class for the org. Apache. Rocketmq. Broker. BrokerStartup, startup mode and nameServer is similar to before starting to configure also ROCKETMQ_HOME path:

Compared with nameServer, there are more startup parameters:

-n localhost:9876 autoCreateTopicEnable=true
Copy the code

This startup parameter specifies the address of the nameServer and enables the automatic creation of a topic.

After the configuration is complete, you can start:

3.4 Starting the Management Background

RocketMq’s administrative back office is in another warehouse github.com/apache/rock… In addition to the background, the repository also contains a number of other modules:

We did not need to analyze this project, and the source code could not have been downloaded. However, when I was looking for the release version of this project, I found that the compiled JAR package was not provided, so I had to build the code myself, so I downloaded the source code again. Of course, due to network reasons, I have also imported the source code of this project into Gitee, the address is gitee.com/funcy/rocke… .

We’re not going to analyze the code for this project, so we’ll just do it on the master branch,

Management background for rocketmq – the console project, the main class for the org. Apache. Rocketmq. Console. The App:

Before the start, we need to modify the application. The properties of the configuration, find rocketmq. Config. NamesrvAddr configuration, add the nameServer IP and port, we connect here is that the local application, Enter localhost:9876:

.
rocketmq.config.namesrvAddr=localhost:9876
.
Copy the code

The result is as follows:

Visit http://localhost:8080 and the results are as follows:

You can see that the broker has appeared in the cluster list, which indicates that the startup was successful.

4. Test sending and receiving messages

The rocketMq project has a number of test examples under the Example module, and we chose one of them for our messaging test.

4.1 startConsumer

Let’s find the org. Apache. Rocketmq. Example. Simple. PushConsumer, code is as follows:

public class PushConsumer {

    public static void main(String[] args) 
            throws InterruptedException, MQClientException {
        String nameServer = "localhost:9876";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe("TopicTest"."*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                System.out.printf("%s Receive New Messages: %s %n", 
                    Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code

The topic that the Consumer is listening to is TopicTest, to which we will send messages later. Also, note that nameServer is configured. We started nameServer locally, so localhost:9876 is configured here.

Running the main() method results in the following:

4.2 startProducer

We find the org. Apache. Rocketmq. Example. Simple. The Producer, the code is as follows:

public class Producer {

    public static void main(String[] args) 
            throws MQClientException, InterruptedException {
        String nameServer = "localhost:9876";
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameServer);
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult); }}catch(Exception e) { e.printStackTrace(); } producer.shutdown(); }}Copy the code

NameServer: localhost:9876 topic: TopicTest

Back to the PushConsumer console:

As you can see, the Producer successfully sent the message, and the PushConsumer successfully received the message.

4.3 Anomaly Analysis

As shown in the figure:

If an exception occurs:

org.apache.rocketmq.client.exception.MQClientException: 
No route info of this topic: TopicTest
Copy the code

This indicates that there is no TopicTest topic currently in the broker. We can either create a topic manually or specify autoCreateTopicEnable=true at startup.

If it is according to the steps above, please make sure the org. Apache. Rocketmq. Broker. If BrokerStartup configuration boot parameters

-n localhost:9876 autoCreateTopicEnable=true
Copy the code

Configure the configuration as described in Section 3.3.

5. To summarize

This paper mainly introduces the basic architecture of rocketMq, shows the startup mode of rocketMq through the source code, and finally shows the process of sending and receiving messages through the test code in the Example module of rocketMq project.

In summary, this article is still preparing the environment for source analysis, and we will begin source analysis of rocketMq in earnest at the beginning of the next article.


Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

In this paper, starting from WeChat number public road, Java technology link: the original mp.weixin.qq.com/s/waDzMr4rO…

If you liked this article, we want more source code analysis (currently has completed the spring/springboot mybatis/tomcat source code analysis), welcome to the attention of the public, let us together in a world of technology road!