Message is sent

RocketMQ supports three message delivery modes:

  • Sync: When the sender executes the send message API to MQ, it waits synchronously until the message server returns the send result.
  • Async: When the sender executes the send message API to MQ, it specifies the callback function after the message is successfully sent, and immediately returns after the call to the send message API. The message sender thread does not block until the end of the run, and the callback task for the successful or failed message is executed in a new thread.
  • Oneway: When the message sender executes the send message API to MQ, it returns directly, without waiting for the result of the message server or registering the callback function.

RocketMQ message

Message properties

Message full property constructor

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if(tags ! = null && tags.length() > 0) this.setTags(tags);if(keys ! = null && keys.length() > 0) this.setKeys(keys); this.setWaitStoreMsgOK(waitStoreMsgOK);
    }
Copy the code

Delay level attribute

    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if(t ! = null) {return Integer.parseInt(t);
        }

        return 0;
    }

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
Copy the code

Message extension property

  • Tag: Message tag, used for message omission.
  • Keys: Message index keys, separated by Spaces.
  • WaitStoreMsgOK: Whether a message is sent and returned after the message has been stored.
  • DelayTimeLevel: message delay level, used for timing messages or message retries.

These extended properties are stored in Message’s properties.

Producer Startup process

Starting a Producer we’re using the DefaultMQProducer class.

    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if(null ! = traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); }}}Copy the code

You can see that the start method only does two things. The first thing is to add a namespace to the ProduerGroup. In this case, you can actually add the address of NameServer. Another thing is called defaultMQProducerImpl. Start ().

    this.checkConfig();

    if(! this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); }Copy the code

Check that the configuration is correct and set the instanceName of the producer to the process Id.

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
Copy the code

Let’s start with the MQClientManager class.

    private static MQClientManager instance = new MQClientManager();
    public static MQClientManager getInstance() {
        return instance;
    }
Copy the code

This is an obvious hunger-man singleton pattern, where there is only one MQClientManager instance in a JVM.

    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if(prev ! = null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId); }}return instance;
    }
Copy the code

Create an MQClientInstance instance. With ClientId(client Ip+ process ID + UnitName (optional)) as the Key, only one MQClientInstance of the same ClientId will exist within a JVM.

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
Copy the code

Registers the current Producer into an instance of MQClientInstance. Only one DefaultMQProducer with the same producerGroupName is allowed in the same mQClientFactory, otherwise the registration will fail.

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
Copy the code

Send is used to automatically create a Topic if the Topic does not exist, and is only available if autoCreateTopicEnable is set to true. This parameter is not recommended in the production environment.

   if (startFactory) {
       mQClientFactory.start();
   }
Copy the code

Start MQClientInstance.

    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if(null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } / / Start the request - response channel / / about netty network interaction enclosing mQClientAPIImpl. Start (); // Start various schedule tasks // Start some scheduled tasks this.startscheduledTask (); / / Start pull service / / consumer related enclosing pullMessageService. Start (); / / Start rebalance service / / consumer related enclosing rebalanceService. Start (); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break; }}}Copy the code

Most scheduled tasks in startScheduledTask are related to consumers. So I won’t go into detail in this chapter. This includes a 120s update of the NameServer address in the local cache. 30S Update topic routing related information (Consumer related) in the local cache, 30s send some of its own information (Consumer related) to all brokers, 50s persist local Consumer offset(Consumer related), 50s update topic routing related information (Consumer related) in the local cache. And 60 seconds to adjust the one-time consumption thread pool (consumer-related).

Message is sent

Message sending consists of three steps: verifying the message, finding the route, and sending the message (including the exception handling mechanism). Let’s take a look at the flow of sending a message using synchronous messages as an example.

Validation messages

    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        Validators.checkMessage(msg, this);
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
    }
Copy the code

Message length verification, message body length cannot be 0 and cannot exceed 4MB.

    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
Copy the code
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
Copy the code

To find the routing

TopicPublishInfo

Local cache of routing information about Topic.

  • MessageQueueList: specifies the message queue of this topic
  • SendWhichQueue: Maintains an index using ThreadLocal for load balancing.

In the sendDefaultImpl method, the routing information related to the topic is first looked up.

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if(null == topicPublishInfo || ! topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); }if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else{// Whether the broker allows automatic creation of topics. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            returntopicPublishInfo; }}Copy the code

If the cache contains routing information for this Topic and MessageQueue in routing information is not empty. The route information is directly returned. If not, query NameServer for routing information for that topic, and update the local cache if any.

If not, try the default theme createTopicKey. NameServer will only return routing information when BrokerConfig#autoCreateTopicEnable is true.

    TopicRouteData topicRouteData;
    if(isDefault && defaultMQProducer ! = null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);if(topicRouteData ! = null) {for(QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); }}}else {
        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    }
Copy the code

If isDefault is true, the route information is queried using the default topic. If the route information is queried, the number of read and write queues in the route information is replaced with the default queue number of the message producer (DefaultTopicQueueNums). If isDefault is false, use topic. If route information is queried, false is returned, indicating that the route information is not changed.

    TopicRouteData old = this.topicRouteTable.get(topic);
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    if(! changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); }else {
        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }
Copy the code

If the route information is queried and compared with the route information in the local cache, changed is set to true.

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) { 
    Entry<String, MQProducerInner> entry = it.next();
    MQProducerInner impl = entry.getValue();
    if (impl != null) {
    impl.updateTopicPublishInfo(topic, publishInfo);
    }
}
Copy the code

Update the local cache for all producers managed by this MQClientInstance.

List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for(QueueData qd: QDS) {// If write queueif(PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; // Find the corresponding brokerfor (BrokerData bd : route.getBrokerDatas()) {
            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                brokerData = bd;
                break; }} // If the broker is null, end the loopif (null == brokerData) {
            continue; } // End the loop if the broker does not contain masterif(! brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue; } // Place the read queue in this QueueDate to TopicPublishInfofor(int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); }}}Copy the code

Loop over QueueData in the routing information, and if the queue has write permission, create the corresponding MessageQueue, filling topicPublishInfo’s List

.

Route search is over here, do not understand the students can follow the source read together.

Select message queue

In the previous step, we got the routing information for the topic.

For example, a topic is distributed in four queues of broker-A and broker-B.

Then the obtained MessageQueue is:

  • {“topic”:”topic”, “brokerName”:”broker-a”, “queueId”:0}
  • {“topic”:”topic”, “brokerName”:”broker-a”, “queueId”:1}
  • {“topic”:”topic”, “brokerName”:”broker-a”, “queueId”:2}
  • {“topic”:”topic”, “brokerName”:”broker-a”, “queueId”:3}
  • {“topic”:”topic”, “brokerName”:”broker-b”, “queueId”:0}
  • {“topic”:”topic”, “brokerName”:”broker-b”, “queueId”:1}
  • {“topic”:”topic”, “brokerName”:”broker-b”, “queueId”:2}
  • {“topic”:”topic”, “brokerName”:”broker-b”, “queueId”:3}

So what if RocketMQ chooses these message queues. The retryTimesWhenSendFailed specifies the retry times in synchronous mode. The asynchronous retry mechanism retries after receiving the message sending structure and before executing the callback. There are two ways to select a message queue:

  • SendLatencyFaultEnable = false. By default, the Broker failure delay mechanism is disabled.
  • SendLatencyFaultEnable = true to enable the Broker fault delay mechanism.

What’s the difference between these two mechanisms? Let’s see.

Public MessageQueue selectOneMessageQueue(Final TopicPublishInfo tpInfo, Final String lastBrokerName) {// Enable failure delayif (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if(notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); }return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            returntpInfo.selectOneMessageQueue(); } // Failure delay is disabled by defaultreturn tpInfo.selectOneMessageQueue(lastBrokerName);
    }
Copy the code

The default mechanism

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}returnselectOneMessageQueue(); }}Copy the code
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
Copy the code

If BrokerName(lastBrokerName), which failed to send the message last time, is empty, then simply append and model the number of message queues using sendWhichQueue.

If the lastBrokerName that failed to send a message last time is not empty, circumvent the BrokerName that failed last time, for example, Last time it failed to send to {“topic”:”topic”, “brokerName”:”broker-a”, “queueId”:0} queue selection will avoid all brokerName broker-A queues.

Let’s take a look at the Sendexecute Queue data structure.

    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }
Copy the code

In fact, the local thread maintains a thread-safe index, the first generation will generate a random number, each time the load increases by one. In other words, sending messages is only load-balancing in the same thread. If multiple users send messages only once each, they walk in a random message queue, teasing the divine load-balancing strategy 😂.

This algorithm can only avoid the broker that failed to send the last time in one send. That is, it is valid only in one send. If the current thread invokes send for the second time, it cannot avoid sending the failed broker in the last send. We can look at the case where lastBrokerName is not null in the selectOneMessageQueue above, and fetch an index from the sendWhichQueue, The lastBrokerName that failed to be sent last time is then circumvented with a temporary index variable. A second send, which would fetch an index from the sendWhichQueue that most likely would have failed in the last one, would result in retries and unnecessary performance losses. Is there any way to temporarily exclude the Broker from the queue selection?

Broker failure delay mechanism

In sendDefaultImpl approach, sent successfully or throw an exception, will be called org. Apache. Rocketmq. Client. Impl. Producer. DefaultMQProducerImpl# updateFaultItem method.

Let’s look at this method first.

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }
Copy the code
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if(this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}Copy the code

It first calculates whether the system is unavailable for a period of time. If the isolation is true, will be 30000 to computeNotAvailableDuration, otherwise the currentLatency delay (current).

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
Copy the code

Take a look at latencyMax and notAvailableDuration.

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
Copy the code

If the delay time reaches a certain amount, the unavailability time is considered a certain amount, which is an empirical value.

Let’s move on to the updateFaultItem method.

    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else{ old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }}Copy the code

Lookup FaultItem from the cache according to the broker, update FaultItem if found, and create new FaultItem if not. The fields are CurrentLatency and StartTimestamp.

  • CurrentLatency and StartTimestamp are volatile and are immediately visible in multiple threads.
  • StartTimestamp Indicates the current system time plus the duration for evading. Is the most direct way to determine whether the Broker is available.

So let’s look at the isAvailable method.

    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if(faultItem ! = null) {return faultItem.isAvailable();
        }
        return true;
    }
Copy the code
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
Copy the code

The Broker is considered available if the current time is greater than or equal to StartTimestamp.

Let’s go back to the process of selecting a message queue method with fault delay turned on.

try {
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
        if(pos < 0) pos = 0; / / first of all, according to the index to polling a mq MessageQueue mq. = tpInfo getMessageQueueList () get (pos); // Check whether it is availableif(latencyFaultTolerance. IsAvailable (mq) getBrokerName ())) {/ / here is not very good, in making a lot of people mention issue says this is a bug but there is no official replyif (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                returnmq; }} / / if not find available mq, are trying to avoid the Broker of choice from a Broker final String notBestBroker = latencyFaultTolerance. PickOneAtLeast (); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {
        final MessageQueue mq = tpInfo.selectOneMessageQueue();
        if(notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); }return mq;
    } else{ latencyFaultTolerance.remove(notBestBroker); }}Copy the code

Message is sent

Message sending API core entry:

    private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout)
Copy the code
  • MSG: message to be sent
  • Mq: The message will be sent to the message queue
  • CommunicationMode: message sending modes, SYNC, ASYNC, ONEWAY
  • SendCallback: Asynchronous message callback function.
  • TopicPublishInfo: Topic routing information
  • Timeout: indicates the timeout time for sending messages
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
Copy the code

Try to retrieve the Broker’s network address from the cache, or if not, actively update the cache from NameServer, and then retrieve it again.

    if(! (msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace =false;
    if(null ! = this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace =true;
    }

    int sysFlag = 0;
    boolean msgBodyCompressed = false;
    if (this.tryToCompressMessage(msg)) {
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        msgBodyCompressed = true;
    }

    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if(tranMsg ! = null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; }Copy the code

For message allocation globally unique ID, if the default message body more than more than 4 k, on the body of the message USES the ZIP, and set up the information system is marked as MessageSysFlag.COM PRESSED_FLAG, if the message is PREPARED transaction message, Tag is set message system MessageSysFlag TRANSACTION_PREPARED_TYPE.

if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if(isTrans ! = null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME")! = null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) ! = null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); }Copy the code

If the message send hook function is registered, the pre-message send enhancement logic is performed.

public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
}
Copy the code

Hook interface.

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); / / set to send group requestHeader. SetProducerGroup (this. DefaultMQProducer. GetProducerGroup ()); // setTopic requestHeader. SetTopic (msg.gettopic ()); / / the default theme creation Key requestHeader. SetDefaultTopic (this) defaultMQProducer) getCreateTopicKey ()); . / / a single Broker default message queue number requestHeader setDefaultTopicQueueNums (this) defaultMQProducer) getDefaultTopicQueueNums ()); // queue Id requestheader.setQueueid (mq.getQueueid ()); // Message system flag requesTheader.setsysFlag (sysFlag); / / message is sent time requestHeader. SetBornTimestamp (System. CurrentTimeMillis ()); // Message flag requestheader.setFlag (msg.getFlag()); / / message extension attributes requestHeader. SetProperties (MessageDecoder. MessageProperties2String (MSG) the getProperties ())); / / message retries requestHeader. SetReconsumeTimes (0); requestHeader.setUnitMode(this.isUnitMode()); SetBatch (MSG instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if(reconsumeTimes ! = null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
Copy the code

Build the message send request package.

public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }
Copy the code

According to the message sending mode, the network can be transmitted in synchronous, asynchronous, or unidirectional mode.

    if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
    }
Copy the code

If the hook function is registered, the after logic is executed.

conclusion

  • When a client invokes Producer to send a message, it first obtains routing information for the topic from NameServer. The header code is GET_ROUTEINFO_BY_TOPIC
  • Routing information returned from NameServer, including the list of queues and brokers contained in the topic
  • The Producer side selects a queue based on the query policy to store messages
  • Each message generates a unique ID that is added to the attributes of the message. The key of the property is UNIQ_KEY
  • Special processing is performed on the message. For example, if the message exceeds 4M, the message will be compressed
  • Producer sends an RPC request to the Broker, saving the message to the Broker side. The code of the message header is SEND_MESSAGE or SEND_MESSAGE_V2 (the configuration file has special flags set)

# References

  • RocketMQ official Doc

  • Inside RocketMQ technology

  • Juejin. Cn/post / 684490…