Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

Next, we continue to analyze the message sending process of producer.

3. DefaultMQProducer#send(...): Sending a message

Producer sends messages using DefaultMQProducer#send(…) :

public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // Call defaultMQProducerImpl
    return this.defaultMQProducerImpl.send(msg);
}
Copy the code

DefaultMQProducerImpl#send(…) Method, continue:

public SendResult send(Message msg) throws MQClientException, RemotingException, 
        MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, long timeout) throws MQClientException, 
        RemotingException, MQBrokerException, InterruptedException {
    // Send asynchronously by default
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
Copy the code

Finally came the DefaultMQProducerImpl#sendDefaultImpl method:

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {...// 1. Find a topic
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
            ...
            // 2. Retry times. Retries are performed only during synchronous sending
            int timesTotal = communicationMode == CommunicationMode.SYNC 
                ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 3. Find a message queue
                MessageQueue mqSelected 
                    = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if(mqSelected ! =null) {...try {
                        / / 4. Send
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); . }catch(...). {// omit exception handling. }}else {
                    break; }}...throw mqClientException;
        }

        validateNameServerSetting();

        throw newMQClientException(...) ; }Copy the code

The above method is the way to send a message. It is still very long, but there are three key points:

  1. According to thetopicFind the corresponding publication information
  2. This parameter is used to obtain the number of retries
  3. Select a message queue to send
  4. Send a message

Let’s take a look at these steps.

3.1 according to thetopicFind the corresponding publication information

Access method of issuing information on the topic for DefaultMQProducerImpl# tryToFindTopicPublishInfo:

/** * Get the published information from 'nameServer' if the topic information does not exist@param topic
 * @return* /
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null== topicPublishInfo || ! topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        / / update the topicRouteInfo
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(
            topic, true.this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        returntopicPublishInfo; }}Copy the code

Here mainly is to call the MQClientInstance# updateTopicRouteInfoFromNameServer method to update the topic subscription information, Update will send a code to the broker for GET_ROUTEINFO_BY_TOPIC request (method for MQClientAPIImpl# getTopicRouteInfoFromNameServer), specific content is not opened.

3.2 Obtaining the Retry Times

When sending a message in synchronous mode, if the message fails to be sent, RocketMQ will have a retry mechanism, which will obtain the number of retries before sending:

int timesTotal = communicationMode == CommunicationMode.SYNC 
    ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
Copy the code

This is a ternary operator, will determine whether to synchronous mode at the beginning, if it is a synchronous mode, send number for this. DefaultMQProducer. GetRetryTimesWhenSendFailed () + 1, one for real want to send the number of times, RetryTimesWhenSendFailed is the number of retries in the event of a failure, maintained in the value DefaultMQProducer#retryTimesWhenSendFailed, which defaults to 2, that is, a maximum of 3 times a message is sent.

3.3 Finding a Message queue

Before sending a message, we need to find a queue for the message to be delivered by DefaultMQProducerImpl#selectOneMessageQueue:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, 
        final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
Copy the code

Follow up with MQFaultStrategy#selectOneMessageQueue:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, 
        final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            // The number of messages sent to the current topic
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            // Get an available broker
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // take modulo operation
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                Return if the MQ broker is available
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            // If the broker is not found after the above steps, a record is obtained from an unavailable broker, again using the fetch process
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if(notBestBroker ! =null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else{ latencyFaultTolerance.remove(notBestBroker); }}catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    // Get a message queue
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
Copy the code

The above method is to get all the contents of the message queue, divided into two main blocks:

  1. If enabled send to recent failuresMessageQueueTo obtain the function ofMessageQueueAfter, will judge theMessageQueueWhere thebrokerIf it is available, it will be retrieved again if it is not
  2. If the preceding operations fail to be obtained or send to latest fault is not enabledMessageQueueTo obtain the function ofMessageQueueAnd then straight back

When obtaining MessageQueue, the most core operation is %(take mold) :

// The number of messages sent to the current topic
intindex = tpInfo.getSendWhichQueue().getAndIncrement(); .// Get the subscript of MessageQueue
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
    pos = 0;
// Get MessageQueue by subscript index
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
Copy the code

According to the rule of modular operation, if all the brokers of MessageQueue are without failure, the messages will be evenly distributed on all queues.

3.4 Sending Messages

The sending method of the message is DefaultMQProducerImpl#sendKernelImpl with the following code:

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, 
        MQBrokerException, InterruptedException {

    long beginStartTime = System.currentTimeMillis();
    // Get the broker address based on the broker name
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        // If the address of the broker cannot be found, get the topic publication information from nameServer again
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if(brokerAddr ! =null) {
        brokerAddr = MixAll.brokerVIPChannel(
            this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
        // Message content
        byte[] prevBody = msg.getBody();
        try{...int sysFlag = 0;
            boolean msgBodyCompressed = false;
            // Data compression
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true; }.../ / build RequestHeader
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            // omit a lot of set operations. SendResult sendResult =null;
            switch (communicationMode) {
                case ASYNC:
                    ...
                    // Send a message
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        // omit a bunch of arguments...). ;break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("...");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        // omit a bunch of arguments...). ;break;
                default:
                    assert false;
                    break; }... }catch (...) {
            ...
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(
                msg.getTopic(), this.defaultMQProducer.getNamespace())); }}throw newMQClientException(...) ; }Copy the code

Mainly assembly parameters in this method, and then call this. MQClientFactory. GetMQClientAPIImpl (). The sendMessage (…). Method to handle sending messages. When sending messages, there are different message modes: ASYNC, ONEWAY, SYNC, but in either case, MQClientAPIImpl#sendMessage(…) is ultimately called. Methods:

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    booleanisReply = msgType ! =null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    // Construct the request command
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = 
                SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(
                RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else{ request = RemotingCommand.createRequestCommand( RequestCode.SEND_REPLY_MESSAGE, requestHeader); }}else {
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 
                = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch 
                ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand
                .createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    request.setBody(msg.getBody());

    // Call the send method
    switch (communicationMode) {
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            ...
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, 
                request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            ...
            return this.sendMessageSync(addr, brokerName, msg, 
                timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }

    return null;
}
Copy the code

As you can see, in this method, the request code is specified, and the ASYNC, ONEWAY, and SYNC types of messages are handled separately.

4. Message sending mode

In the process of message sending, we found that there are three modes of message sending:

  1. SYNC: Indicates synchronous mode. After the message is sent, the message is returned
  2. ASYNC: asynchronous mode, does not return send results, but note that listeners listen for message send results
  3. ONEWAY: Send only once, regardless of result or failure

Let’s take a look at how each of the three messages is sent.

4.1 Synchronization Mode

The following is an example of sending in synchronous mode:

public static void main(String[] args) throws MQClientException, InterruptedException {
    String nameServer = "localhost:9876";
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.setNamesrvAddr(nameServer);
    producer.start();

    for (int i = 0; i < 1; i++)
        try {
            Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // Synchronously send, wait for the send to complete and return the result
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();
}
Copy the code

In this mode, when the sending is complete, the sending result is returned, and when the result is a failure, we can do additional processing on it.

Also, in synchronous mode, if the message fails to be sent, rocketMq automatically retries (2 retries by default).

NettyRemotingAbstract#invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, 
        RemotingTimeoutException {

    final int opaque = request.getOpaque();
    try {
        final ResponseFuture responseFuture = new ResponseFuture(
            channel, opaque, timeoutMillis, null.null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        // Send the request
        channel.writeAndFlush(request)
        // Monitor results
        .addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed."); }});// Wait for the result to return
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), 
                    timeoutMillis, responseFuture.getCause());
            } else {
                throw newRemotingSendRequestException( RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); }}return responseCommand;
    } finally {
        this.responseTable.remove(opaque); }}Copy the code

It can be seen that netty uses the asynchronous listening mode to get the result when the bottom processing message sending, and the so-called synchronous mode is to wait for Netty to complete the request and response operation and get the result artificially.

4.2 Asynchronous Mode

The following is an example of sending in asynchronous mode:

public static void main(String[] args) throws MQClientException, 
        InterruptedException, UnsupportedEncodingException {
    String nameServer = "localhost:9876";
    DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
    producer.setNamesrvAddr(nameServer);
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);

    int messageCount = 100;
    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    for (int i = 0; i < messageCount; i++) {
        try {
            final int index = i;
            Message msg = new Message("Jodie_topic_1023"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // Send, the essence of asynchronous sending is SendCallback
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); }}); }catch (Exception e) {
            e.printStackTrace();
        }
    }
    countDownLatch.await(5, TimeUnit.SECONDS);
    producer.shutdown();
}
Copy the code

Compared to synchronous mode, the send method does not return the send result, but the send method has a SendCallback parameter. When we want to listen for the send result of the message, we can override the onSuccess(…) of this class. With onException (…). Method, so as to achieve the purpose of listening to send results.

In this mode, if the message fails to be sent, we can override the SendCallback#onException method to customize the logic for failure handling.

NettyRemotingAbstract#invokeAsyncImpl

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, 
        final long timeoutMillis, final InvokeCallback invokeCallback) 
        throws InterruptedException, RemotingTooMuchRequestException, 
        RemotingTimeoutException, RemotingSendRequestException {
    long beginStartTime = System.currentTimeMillis();
    final int opaque = request.getOpaque();
    / / acquiring a lock
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            once.release();
            throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
        }

        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, 
            timeoutMillis - costTime, invokeCallback, once);
        // Add responseTable to responseTable
        this.responseTable.put(opaque, responseFuture);
        try {
            // Netty asynchronous operation
            channel.writeAndFlush(request)
            // Monitor results
            .addListener(new ChannelFutureListener() {
                // Process the completed operation
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    // Processing failed
                    requestFail(opaque);
                    log.warn(...);
                }
            });
        } catch(Exception e) { ... }}else{... }}Copy the code

Can see from the above methods, the monitoring results of logic, if successful, will call responseFuture. SetSendRequestOK (true), and then return; If it fails, requestFail is called. How do these operations relate to SendCallback? We’re going to analyze it.

In the call writeAndFlush (…). Put (opaque, responseFuture), and add responseFuture to responseTable, which is a Map, RocketMq periodically gets the responseFuture from the responseTable and determines its state to determine which method to call SendCallback.

Let’s go back to the NettyRemotingClient startup process using NettyRemotingClient#start:

public void start(a) {...// Scan messages for results once per second
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run(a) {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e); }}},1000 * 3.1000); . }Copy the code

In this method, a timed task is started, once per second, and all it does is scan the responseFuture in the responseTable. Let’s go to NettyRemotingAbstract#scanResponseTable:

public void scanResponseTable(a) {
    // Return to be processed this time
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        RfList = rfList = rfList
        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) 
                <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, "+ rep); }}// Processing returns
    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e); }}}Copy the code

ResponseFuture (ResponseFuture, ResponseFuture, ResponseFuture, ResponseFuture); Reps. GetTimeoutMillis () has a value of 3. NettyRemotingAbstract#executeInvokeCallback

private void executeInvokeCallback(final ResponseFuture responseFuture) {
    boolean runInThisThread = false;
    // Commit to the thread pool if there is one
    ExecutorService executor = this.getCallbackExecutor();
    if(executor ! =null) {
        try {
            executor.submit(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        // This is the specific operation
                        responseFuture.executeInvokeCallback();
                    } catch(Throwable e) { log.warn(...) ; }finally{ responseFuture.release(); }}}); }catch (Exception e) {
            runInThisThread = true;
            log.warn(...);
        }
    } else {
        runInThisThread = true;
    }

    if (runInThisThread) {
        try {
            // Execute directly
            responseFuture.executeInvokeCallback();
        } catch (Throwable e) {
            log.warn("executeInvokeCallback Exception", e);
        } finally{ responseFuture.release(); }}}Copy the code

The ResponseFuture#executeInvokeCallback method calls ResponseFuture#executeInvokeCallback.

public void executeInvokeCallback(a) {
    if(invokeCallback ! =null) {
        if (this.executeCallbackOnlyOnce.compareAndSet(false.true)) {
            // Continue processing
            invokeCallback.operationComplete(this); }}}Copy the code

To this is the key, we follow up invokeCallback. OperationComplete (this) method, found that came to MQClientAPIImpl# sendMessageAsync (…). Methods:

private void sendMessageAsync(... // sendCallback is passed in as an argumentfinalSendCallback SendCallback, // omit other arguments...) throws InterruptedException, RemotingException {
    final long beginStartTime = System.currentTimeMillis();

    / / remotingClient. The invokeAsync will call to NettyRemotingAbstract# invokeAsyncImpl
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, 
        / / InvokeCallback parameters
        new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                longcost = System.currentTimeMillis() - beginStartTime; RemotingCommand response = responseFuture.getResponseCommand(); .if(response ! =null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); .try {
                            // Call sendCallback onSuccess(...) methods
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                    } catch (Exception e) {
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, 
                            sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer); }}else {
                    // Handle each exception.// Call the exception handler
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, 
                            sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer); . }}}); }Copy the code

. This method mainly call remotingClient invokeAsync () method, and remotingClient. InvokeAsync () will call to NettyRemotingAbstract# invokeAsyncImpl, This is what UEMD refers to as the netty method that ultimately uses exception sending.

RemotingClient. InvokeAsync () method, a total of three parameters, here we focus on the last parameter: InvokeCallback, the parameters of operationComplete (…). Method, which handles the completion of the operation. Here we focus on two operations:

  • sendCallback.onSuccess(sendResult): called when the response is successfulsendCallbackonSuccess(...)Method,sendCallbackPassed in by a parameter
  • onExceptionImpl(...): method, called when an exception occurs

We continue into the MQClientAPIImpl#onExceptionImpl method:

private void onExceptionImpl(finalString brokerName, ... // sendCallback is passed in as a method parameterfinal SendCallback sendCallback,
    ...
) {
    int tmp = curTimes.incrementAndGet();
    // Process the retry operation
    if (needRetry && tmp <= timesTotal) {
        ...
    } else{...try {
            // Execute sendCallback onException(.. ) method,
            sendCallback.onException(e);
        } catch (Exception ignored) {
        }
    }
}
Copy the code

As you can see, sendCallback’s onException(.. The,) method is called from here.

In this method, the message is also retried, the number of retries passed in DefaultMQProducerImpl#sendKernelImpl:

4.3 onewaymodel

The following is an example of this pattern:

public static void main(String[] args) throws MQClientException, InterruptedException {
    String nameServer = "localhost:9876";
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.setNamesrvAddr(nameServer);
    producer.start();

    for (int i = 0; i < 1; i++)
        try {
            Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // In oneway mode, the send method returns no value
            producer.sendOneway(msg);
        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();
}
Copy the code

Unlike the previous two modes, in this mode, the message is sent only once and no result is returned, and there are no listening parameters to listen for the result of the message. In this mode, the message is sent only once, regardless of the result.

To handle this pattern, NettyRemotingAbstract#invokeOnewayImpl:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, 
        final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, 
        RemotingTimeoutException, RemotingSendRequestException {
    request.markOnewayRPC();
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            channel.writeAndFlush(request)
            // Only one log is printed
            .addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn(...);
                    }
                }
            });
        } catch(Exception e) { ... }}else{... }}Copy the code

As you can see in this method, only a WARN level log is logged in the logic for the message sending result when the message fails to be sent.

5. To summarize

This paper mainly sorts out the process of starting and sending messages from producer. Here we make a summary.

When rocketMq starts up, the startup does the following:

  1. The Netty client configuration is configured. Procedure
  2. Start a scheduled task, for example, obtainnameserverAddress, regularly update routing information of topic, regularly send heartbeat information

When sending messages, RocketMQ supports three main delivery methods: Sync, Async and Oneway.

  • Synchronization: After a message is issued, the thread blocks until a result is returned
  • Asynchronous: When sending messages, you can set the monitoring of sending results. After sending messages, the thread will not block and the sending results will be monitored after sending messages
  • Unidirectional: After the message is sent, the thread will not be blocked, the result will not be returned, and the monitoring of the sending result cannot be set. That is, the thread can send the message without caring about the sending result or whether the message is sent successfully

In terms of message reliability,

  • Synchronous sending: When a message fails to be sent, the system internally retries the message (by default, the message is sent once and the message is retried twice). In addition, the system can obtain the sending result after the message is sent. Therefore, the system can independently process the failure result
  • Asynchronous sending: When a message fails to be sent, internal retries are performed. By default, one message is sent and two attempts are tried. In addition, you can set the listening rules for sending messages
  • Unidirectional: In this mode, there is no retry when a message fails to be sentwarn), and no result is returned, and no result is listened

Ok, that’s all about producer.


Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!