preface

RocketMQ producer source code analysis I will divided into 3 introduces the three articles, this article is the first article mainly introduces the start-up process of producers and synchronous message transmission process, due to the limited level of the author to write the wrong welcome to comment, like writing is good please point a praise, your thumb up support is the power of I insist on writing.

The noun is introduced

They are producers of messages.

Responsible for production messages, which are generally handled by business systems. A message producer sends messages generated in the business application to the Broker server. RocketMQ provides multiple delivery modes: synchronous, asynchronous, sequential, and unidirectional. Both synchronous and asynchronous require the Broker to return an acknowledgement message, but one-way does not.

From RocketMQ

Producer Group

A set of producers of the same kind who send the same kind of messages and send them logically. If a transaction message is sent and the original producer crashes after sending, the Broker server contacts other producer instances in the same producer group to commit or backtrack consumption.

From RocketMQ

Topic

Represents a collection of a class of messages, each containing several messages, each belonging to only one topic, and is the basic unit of Message subscription for RocketMQ.

From RocketMQ

Message

The physical carrier of information transmitted by a message system, the smallest unit that produces and consumes data, and each message must belong to a topic. Each Message in RocketMQ has a unique Message ID and can carry a Key with a business identity. The system supports Message query by Message ID and Key.

From RocketMQ

Questions lead

  • Synchronous send, asynchronous send, sequential send, one-way send, this several ways what is the difference, respectively is how to achieve.
  • How do producers ensure that messages are delivered correctly when the primary node of a cluster of primary and secondary brokers is down (or can messages be delivered smoothly when the primary node is down)?

Introduction to core Classes

  • org.apache.rocketmq.client.producer.DefaultMQProducer: Default implementation class for message producers, providing methods for sending messages.
  • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl: encapsulates the logic of sending messages.
  • org.apache.rocketmq.client.producer.TransactionMQProducer: transaction message producer inheritsDefaultMQProducerWhat a transaction message is and how it is implemented will not be covered in this article but will be covered in the next article.
  • org.apache.rocketmq.client.producer.MessageQueueSelector: Selects the interface class of MessageQueue when sending messages.
  • org.apache.rocketmq.client.impl.factory.MQClientInstanceThis class encapsulates a common implementation of the client (producer and consumer).

Learning point

  • By looking at theDefaultMQProducerSource code we will findDefaultMQProducerImplIt is a member variable that enhances itself through composition rather than inheritance (more composition and less inheritance, because inheritance is not flexible, and when there are too many inheritance layers, the code is difficult to understand and maintain).
  • DefaultMQProducerImplements the interfaceorg.apache.rocketmq.client.producer.MQProducerThe interfaceMQProducerProvides a simple, easy-to-understand method definition that lets users not worry about the internal complexity of the implementation, out of the box, which is actually used hereFacade design patternMQProducerThis is the facade (The main function of facade mode is to provide an interface for clients to access the system and hide the internal complexity of the system).

Source code analysis

Start the process

Start with the producer-initiated process (omit some non-critical code such as log printing, exception catching, etc.)

// DefaultMQProducerImpl#start
 public void start(final boolean startFactory) throws MQClientException {
       / / # 1)
       switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                / / # 2)
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                / / # 3)
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if(! registerOK) {this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                / / # 4
                if (startFactory) {
                    mQClientFactory.start();
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}".this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        / / # 5)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        / / # 6
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run(a) {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e); }}},1000 * 3.1000);
    }
Copy the code

① : Perform different actions according to different states (actually this is a variant implementation of the state mode).

(2) : get into getOrCreateMQClientInstance MQClientInstance instance method we find MQClientInstance instance objects within the process is the only (with the help of simple factory pattern). Here the org. Apache. Rocketmq. Client. Impl. MQClientManager use the singleton pattern

③ Register yourself (the producer) in MQClientInstance.

④ start the client instance service.

⑤ : Send heartbeat messages to all brokers.

⑥ : Periodically scans the expired requests sent by the client.

Let’s look at how the client instance sends heartbeat information to all brokers.

#MQClientInstance#sendHeartbeatTimesTotal public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); }} ------------------------------------------------------------------------------------------------------------------------ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - private void sendHeartbeatToAllBroker () {/ / # 1) preparing a heartbeat data final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId); return; } if (! this.brokerAddrTable.isEmpty()) { long times = this.sendHeartbeatTimesTotal.getAndIncrement(); Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<Long, String>> entry = it.next(); String brokerName = entry.getKey(); HashMap<Long, String> oneTable = entry.getValue(); if (oneTable ! = null) { for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr ! = null) { if (consumerEmpty) { if (id ! = MixAll.MASTER_ID) continue; } to try {/ / communicate with Broker sends a heartbeat data int version = this. MQClientAPIImpl. SendHearbeat (addr, heartbeatData, 3000); if (! this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e); } } } } } } } }Copy the code

So far the start process analysis of the producer is finished, we will comb it.

  • To obtainMQClientInstanceInstance.
  • toMQClientInstanceInstance object registers the producer.
  • Start theMQClientInstanceService.
  • Send heartbeats to all brokers.
  • Example Start the scheduled task of scanning expired requests.

A synchronous message

Next, we analyze the sending process of synchronous messages.

 private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        / / # 1)
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        / / # 2)
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            / / # 3)
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                / / # 4
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if(mqSelected ! =null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        / / # 5)
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        / / # 6
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime/* The timeout needs to be subtracted from the time spent */);
                        endTimestamp = System.currentTimeMillis();
                        / / # 7
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if(sendResult.getSendStatus() ! = SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue; }}return sendResult;
                            default:
                                break; }}catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if(sendResult ! =null) {
                                    return sendResult;
                                }
                                throwe; }}catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        throwe; }}else {
                    break; }}if(sendResult ! =null) {
                return sendResult;
            }
            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));
            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }
            throw mqClientException;
        }
        / / # today
        validateNameServerSetting();
        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
Copy the code

① : Confirm the service status of the producer.

② : Get subscription information by topic.

③ : Retry times of message sending failure.

④ : Select a MessageQueue.

⑤ : Check whether timeout, we can see from the code for multiple times of timeout detection, this segmented detection method can facilitate us to locate the problem.

⑥ : Sends messages.

⑦ : Registration failure.

⑧ : Check whether the Namesrv address is configured.

Next, we will analyze ⑥ in detail.

  private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        // Get the broker address
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            // Get the broker address
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }
        SendMessageContext context = null;
        if(brokerAddr ! =null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if(! (msginstanceof MessageBatch)) {
                / / # 1)
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null! =this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                / / # 2)
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                // Determine whether the message is a transaction message
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if(tranMsg ! =null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                // Hook
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                // Hook
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if(isTrans ! =null && isTrans.equals("true")) {
                        // Determine if it is a transaction half-message
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null|| msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) ! =null) {
                        // Determine whether the message is delayed
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    // Message context is handled by the hooks before the message is sent
                    this.executeSendMessageHookBefore(context);
                }
                // Construct the message request header
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if(reconsumeTimes ! =null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if(maxReconsumeTimes ! =null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    // Asynchronous messages
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if(! messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned =true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        // Determine whether the timeout is timed out again, which is helpful for troubleshooting
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                        // One-way message
                    case ONEWAY:
                        // Synchronize messages
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
            / / # 3)
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); }}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist".null);
    }
Copy the code

① : Set a batch number for batch messages.

(2) Check whether messages need to be compressed. Batch messages in the current version do not support compression.

③ According to different exceptions, execute different hooks to finish.

conclusion

Through the above source code analysis we can try to answer the question we raised at the beginning.

  • When a node in the Broker cluster becomes unavailable, the producer retries with a failure policy to ensure that the message is delivered correctly.
  • From the above source code analysis, we can see that the synchronous send main process is always a thread to do all the work (get route new -> message assembly -> wait for the delivery result)