sequence

This article focuses on sendOrderly of RocketMQ

sendOrderly

Rocketmq – spring – the boot / 2.0.4 / rocketmq – spring – the boot – 2.0.4 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { //...... public SendResult syncSendOrderly(String destination, Message<? > message, StringhashKey, long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
            long costTime = System.currentTimeMillis() - now;
            if (log.isDebugEnabled()) {
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            }
            return sendResult;
        } catch (Exception e) {
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } public void asyncSendOrderly(String destination, Message<? > message, StringhashKey, SendCallback sendCallback,
        long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        try {
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
        } catch (Exception e) {
            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }} / /... }Copy the code
  • The last call to the syncSendOrderly method is producer.send(rocketMsg, messageQueueSelector, hashKey, timeout). The asyncSendOrderly method last calls producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout). More sendCallback than syncSendOrderly

DefaultMQProducer

Rocketmq – the client – 4.6.0 – sources jar! /org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

	//......

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        returnthis.defaultMQProducerImpl.send(msg, selector, arg, timeout); } public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } / /... }Copy the code
  • Is defaultMQProducerImpl DefaultMQProducer the send method of the last call send (MSG, the selector, arg, timeout) or defaultMQProducerImpl. Send (MSG, Selector, arG, sendCallback, timeout) methods

DefaultMQProducerImpl

Rocketmq – the client – 4.6.0 – sources jar! /org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

	//......

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
    }

    private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if(topicPublishInfo ! = null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e);
            }

            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if(mq ! = null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }

        validateNameServerSetting();
        throw new MQClientException("No route info for this topic, "+ msg.getTopic(), null); } / /... }Copy the code
  • DefaultMQProducerImpl’s send method calls sendSelectImpl, The method in the Validators. After checkMessage through tryToFindTopicPublishInfo (MSG) getTopic ()) to find topicPublishInfo, couldn’t find the thrown MQClientException
  • Found topicPublishInfo by mQClientFactory. GetMQAdminImpl () parsePublishMessageQueues (topicPublishInfo. GetMessageQueueList () ) get messageQueueList
  • After through mQClientFactory getClientConfig (.) queueWithNamespace (selector. Select (messageQueueList userMessage, arg)) method for mq, SendKernelImpl (MSG, MQ, communicationMode, sendCallback, NULL, timeouts-costtime) is sent if MQ is not null

summary

  • DefaultMQProducerImpl’s send method calls sendSelectImpl, The method in the Validators. After checkMessage through tryToFindTopicPublishInfo (MSG) getTopic ()) to find topicPublishInfo, couldn’t find the thrown MQClientException
  • Found topicPublishInfo by mQClientFactory. GetMQAdminImpl () parsePublishMessageQueues (topicPublishInfo. GetMessageQueueList () ) get messageQueueList
  • After through mQClientFactory getClientConfig (.) queueWithNamespace (selector. Select (messageQueueList userMessage, arg)) method for mq, SendKernelImpl (MSG, MQ, communicationMode, sendCallback, NULL, timeouts-costtime) is sent if MQ is not null

The sendDefaultImpl method is not used, which selects MessageQueue via selectOneMessageQueue(topicPublishInfo, lastBrokerName); The sendOrderly method sends messages by selecting MessageQueueSelector

doc

  • DefaultMQProducerImpl