In the three processes, message sending is the most simple and easy to start with. It is suitable for junior middle school children as the starting point of MQ research and learning. Therefore, this article focuses on the process and details of sending a normal message in RocketMQ, a distributed message queue, from a message delivery point of view.

RocketMQ network Architecture Diagram

The network deployment architecture of the RocketMQ distributed message queue is shown in the following figure (where Producer Producer sends ordinary messages to the cluster).

A few notes on the characters above:

(1) NameServer The RocketMQ cluster’s NameServer (or registry), which itself is stateless (there may be temporary inconsistencies on each NameServer instance, but with periodic updates it is mostly consistent), is used to manage the cluster’s metadata (for example, KV configuration, Topic, Broker registration information).

(2) Broker (Master) : The main node of RocketMQ message Broker server, which plays the role of connecting Producer’s message sending and Consumer’s message consumption, and storing messages.

(3) Broker (Slave) : RocketMQ message Broker server backup node, mainly through synchronous/asynchronous mode to synchronize messages from the master node to backup, to ensure the high availability of RocketMQ cluster;

(4) Producer: In this case, Producer is the Producer of ordinary messages. The messages are sent to the RocketMQ master node based on the RocketMQ-Client module.

For the relationship of several communication links in the figure above:

(1) Producer and NamerServer: Each Producer will establish a TCP connection with an instance in the NameServer cluster and pull Topic routing information from the NameServer instance.

(2) Producer and Broker: The Producer establishes a TCP connection with the Master Broker server associated with the topic it wants to send, which is used to send messages and timed heartbeat messages.

(3) Broker and NamerServer: The Broker (Master or Slave) establishes a TCP connection with each NameServer instance. The Broker registers its configured Topic information to each machine in the NameServer cluster at startup. That is, each NameServer has the Topic routing configuration of the broker. The Master is not connected to the Master, but the Master is connected to the Slave.

2. Demo method for sending ordinary messages

The RocketMQ source Project example package contains the simplest sample code for sending ordinary messages (ps: For those new to RocketMQ, use the sample code below for systematic learning and debugging). We can directly run the “org. Apache. Rocketmq. Example. Simple” under the package Producer class is the main method can complete a common message sending (mainly the HTML code is as follows, here will need local NameServer and Broker instances are deployed) :

Starting DefaultMQProducer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer"); producer.start(); Message MSG = new Message()"TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult = producer.send(MSG); } } catch (Exception e) { //Exception code } producer.shutdown();Copy the code

Iii. Full process interpretation of RocketMQ sending ordinary messages

As you can see from the previous section, the demo code for message producers to send messages is relatively simple, with only a few lines of code at its core, but after digging into RocketMQ’s Client module, the core process for sending messages is a bit more complex. The following will mainly analyze and elaborate the startup process of DefaultMQProducer, the sending method and the message processing of Broker agent server.

3.1 Startup process of DefaultMQProducer

In the demo code where the client sends normal messages, we start DefaultMQProducer instance, which calls the start() method of DefaultMQProducerImpl, the implementation class that generates messages by default.

@Override
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }
Copy the code

DefaultMQProducerImpl, an implementation class that generates messages by default, starts as follows:

(1) Initialize MQClientInstance instance object and register it in local cache variable — producerTable;

(2) Save the default Topic (” TBW102 “) to the local cache variable — topicPublishInfoTable;

(3) MQClientInstance invokes its own start() method to start some client local service threads, such as pull message service, client network communication service, re-load balancing service, and several other scheduled tasks (including, Update the route/clean up the offline Broker/ send the heartbeat/persist the consumerOffset/ adjust the thread pool) and start again (this time with false);

(4) Finally send heartbeat packets to all Broker proxy server nodes;

(2) According to different clientId, MQClientManager will give different MQClientInstance;

(3) According to different producerGroup, MQClientInstance will give different MQProducer and MQConsumer (stored in local cache variables — producerTable and consumerTable);

3.2 Send The core process of the sending method

There are three main ways to send messages through Rocketmq’s client section:

(1) Synchronization mode

(2) Asynchronous mode

(3) Oneway

Methods (1) and (2) are commonly used to send messages. The specific method depends on the service situation. This section describes the core process of sending messages in combination with the synchronous sending mode. In the synchronous sending mode, if a message fails to be sent, a maximum of three retries are performed (you can also customize the retries), and all other modes are performed once. The entry to the core process of sending messages synchronously is as follows:

/** * The default timeout time is 3s ** @param MSG Message content * @param timeout The timeout time for sending messages can be set by parameter * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
Copy the code

3.2.1 Try to obtain the TopicPublishInfo routing information

If we debug it step by step, we’ll see that the sendDefaultImpl() method prevalidates the sent message first. If the message of the Topic and the Body are no problem, then will call – tryToFindTopicPublishInfo () method, according to the sent messages contained in the Topic to try from the Client side to find in the local cache variable – topicPublishInfoTable, If there is no will update the Topic from the NameServer routing information (including, call the updateTopicRouteInfoFromNameServer MQClientInstance instance method, Eventually enforce a MQClientAPIImpl instance getTopicRouteInfoFromNameServer method), here there are the following two scenarios:

(1) The producer sends the message for the first time (Topic does not exist in NameServer at this time) : the first fetch failed to pull down from the remote NameServer and update the local cache variable – topicPublishInfoTable successfully. Therefore, the second time we need to construct the TopicPublishInfo object from the TopicRouteData variable of the default Topic — TBW102, and update the topicPublishInfoTable, the local cache variable of the DefaultMQProducerImpl instance.

In addition, in this type of scenario, when a message is sent to the Broker proxy server, Business in SendMessageProcessor processor sendBatchMessage/sendMessage method of super msgCheck (CTX, requestHeader, response) pre check in, Will call TopicConfigManager createTopicInSendMessageMethod method, on the Broker to complete the creation of a new Topic and persistence to the configuration file (configuration file path: {rocketmq. Home. Dir} / store/config/switchable viewer. Json). (Ps: This part is actually outside the scope of the Broker, but it is mentioned slightly because it involves the creation of a new Topic.)

(2) The producer sends the existing message to the Topic: Because in the NameServer has been going on for the Topic, so in the first time when will be able to get them back to and updates to the local cache variable topicPublishInfoTable, then tryToFindTopicPublishInfo method can return directly. The source code for this part of the core method in RocketMQ is as follows (annotated) :

/** * Get the topicPublishInfo from topicPublishInfoTable based on MSG's topic. Pull the latest routing information from nameserver * * topicPublishInfo * * @param Topic * @return*/ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //step1. First from the local cache variable topicPublishInfoTable first get a TopicPublishInfo TopicPublishInfo = this. TopicPublishInfoTable. Get (topic);if(null == topicPublishInfo || ! topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); / / step1.2 routing information and then update the topic from the nameServer enclosing mQClientFactory. UpdateTopicRouteInfoFromNameServer (topic); / / step2 then get from the local cache variable topicPublishInfoTable again topicPublishInfo = this. TopicPublishInfoTable. Get (topic); }if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else{/** * the first time isDefault isfalseThe second time, default istrue, that is, to use the default topic parameters updated * / this. MQClientFactory. UpdateTopicRouteInfoFromNameServer (topic,true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            returntopicPublishInfo; }}Copy the code
/** * Pull Topic routing information from the remote NameServer registry when it does not exist in the local cache ** @param Topic * @param timeoutMillis * @param allowTopicNotExist * @return* @throws MQClientException * @throws InterruptedException * @throws RemotingTimeoutException * @throws RemotingSendRequestException * @throws RemotingConnectException */ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); After setting the Topic parameter in the request header, Send a request for Topic routing information to NameServer RemotingCommand Request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); // It is sent synchronouslyreturnThe response of the response RemotingCommand response = this. RemotingClient. InvokeSync (null, request, timeoutMillis); assert response ! = null; Switch (response.getCode()) {// If NameServer has no Topic to send messages tocase ResponseCode.TOPIC_NOT_EXIST: {
                if(allowTopicNotExist && ! topic.equals(MixAll.DEFAULT_TOPIC)) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break; } // If TopicRouteData exists, TopicRouteData is decoded and TopicRouteData is returned directlycase ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if(body ! = null) {return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
Copy the code

The mapping of TopicRouteData to TopicPublishInfo is as follows:

3.2.2 Selecting a queue for sending messages

After obtaining the TopicPublishInfo routing information, the RocketMQ client, by default, The selectOneMessageQueuef() method selects a queue (MessageQueue) from the messageQueueList in TopicPublishInfo to send the message. The specific fault-tolerant policies are defined in the MQFaultStrategy class:

Public class MQFaultStrategy {// Maintain the delay for sending messages to each Broker private final LatencyFaultTolerance<String> LatencyFaultTolerance = new LatencyFaultToleranceImpl(); Private Boolean sendLatencyFaultEnable =false; Private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; Private Long [] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; private Long [] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; . }Copy the code

The sendLatencyFaultEnable switch is used to select which of the following:

(1) sendLatencyFaultEnable on: Filter out brokers that are not available based on random incremental modulus. A “latencyFaultTolerance” is a fixed amount of time to back off from previous failures. For example, if the latency of the last request exceeds 550Lms, back away from 3000Lms; Over 1000L, retreat to 60000L.

(2) sendLatencyFaultEnable switch off (default off) : select a queue (MessageQueue) to send messages by random incremental modulus.

/** * Select queue to send messages in two cases based on whether the sendLatencyFaultEnable switch is turned on * @param tpInfo * @Param lastBrokerName * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if(this.sendLatencyFaultEnable) { try { //1. Filter out brokers that are not available on the basis of random incremental modulo. Int index = tpInfo.getsendexecute queue ().getAndincrement ();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if(notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); }return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            returntpInfo.selectOneMessageQueue(); } //2. Select a queue (MessageQueue) to send messagesreturn tpInfo.selectOneMessageQueue(lastBrokerName);
    }
Copy the code

3.2.3 Sending the Encapsulated RemotingCommand Packet

After selecting the queue to send the message, RocketMQ calls the sendKernelImpl() method to send the message (which is the core of the actual message sent through RocketMQ’s Remoting communication module). The following steps are completed in this method:

(1) according to the front to the MessageQueue brokerName, call MQClientInstance instance findBrokerAddressInPublish () method, which is sent in the message Broker proxy server address, If not, follow the new routing information;

(2) if not disable, sending messages will have a hook function before and after the execution of the (executeSendMessageHookBefore ()/executeSendMessageHookAfter () method).

(3) encapsulate the information related to the message into a RemotingCommand packet, in which the RequestCode is one of the following:

A.send_message (ordinary sent message)

B. Send_message_v2 (optimize network packet sending) C. Send_batch_message (send messages in batches)

(4) According to the obtained Broke proxy server address, the encapsulated RemotingCommand packet is sent to the corresponding Broker. The default sending timeout is 3s

(5) Here, the real call to RocketMQ’s Remoting communication module to complete message sending is in MQClientAPIImpl instance sendMessageSync() method, the code is as follows:

private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response ! = null;return this.processSendResponse(brokerName, msg, response);
    }
Copy the code

(6) The processSendResponse method handles the normal and abnormal cases of sending and returns the sendResult object;

(7) After sending back, call updateFaultItem to update the available time of the Broker proxy server;

(8) for abnormal situation, and mark – retryAnotherBrokerWhenNotStoreOK, when set to true, at the time of failure, will choose to change a Broker;

After the producer sends the message, the client log is printed as follows:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]


Copy the code

3.3 Broker Server message processing brief analysis

There are many Processor business processors in a Broker server, which are used to process different types of requests. One or more of these processors share a single Processor thread pool. For received messages, the Broker uses the business processor SendMessageProcessor to process them. SendMessageProcessor does the following in turn:

(1) Message pre-check, including whether the broker is writable, check whether queueId exceeds the specified size, and whether the Topic routing information in the message exists. If not, create a new one. This corresponds to the section “Trying to get routing information from TopicPublishInfo” above. If the Topic routing information does not exist, the Broker logs are as follows:

2018-06-14 17:17:24 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=252, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON] 2018-06-14 17:17:24 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /172.20.21.162:62661 2018-06-14 17:17:24 INFO SendMessageThread_1 - Create new Topic by default Topic :[TBW102] config:[TopicConfig [topicName=TopicTest,readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=falseProducer]] : [172.20.21.162:62661]Copy the code

After Topic routing information is created and the second message is sent, the Broker logs are as follows:

2018-08-02 16:26:13 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]
2018-08-02 16:26:13 INFO SendMessageThread_1 - the msgInner's content is:MessageExt [queueId=2, storeSize=0, queueOffset=0, sysFlag=0, bornTimestamp=1533198373524, BornHost =/172.20.21.162:53914, storeTimestamp=0, storeHost=/172.20.21.162:10911, msgId= NULL, commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true, TAGS=TagA}, body=11body's content is:Hello world]]

Copy the code

(2) Build MessageExtBrokerInner;

(3) call “brokerController. GetMessageStore (). The putMessage” will be treated as do MessageExtBrokerInner fall plate of persistent;

BrokerStatsManager does some statistical updates based on message drop results (normal/abnormal) and sets Response and returns.

Note: I am a slow walker, but I never regret it.