sequence

This article focuses on RocketMQTemplate

RocketMQTemplate

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

    private DefaultMQProducer producer;

    private ObjectMapper objectMapper;

    private String charset = "UTF-8"; private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!! / /... @Override public void afterPropertiesSet() throws Exception {if(producer ! = null) { producer.start(); } } @Override protected voiddoSend(String destination, Message<? > message) { SendResult sendResult = syncSend(destination, message); log.debug("send message to `{}` finished. result:{}", destination, sendResult); } @Override protected Message<? >doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
        String content;
        if (payload instanceof String) {
            content = (String) payload;
        } else {
            // If payload not as string, use objectMapper change it.
            try {
                content = objectMapper.writeValueAsString(payload);
            } catch (JsonProcessingException e) {
                log.error("convert payload to String failed. payload:{}", payload);
                throw new RuntimeException("convert to payload to String failed.", e); } } MessageBuilder<? > builder = MessageBuilder.withPayload(content);if(headers ! = null) { builder.copyHeaders(headers); } builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); Message<? > message = builder.build();if(postProcessor ! = null) { message = postProcessor.postProcessMessage(message); }return message;
    }

    @Override
    public void destroy() {
        if (Objects.nonNull(producer)) {
            producer.shutdown();
        }

        for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
            if(Objects.nonNull(kv.getValue())) { kv.getValue().shutdown(); } } cache.clear(); } / /... }Copy the code
  • RocketMQTemplate inherited the spring – messaging AbstractMessageSendingTemplate, implements InitializingBean DisposableBean interfaces; Methods such as syncSend, syncSendOrderly, asyncSend, asyncSendOrderly, sendOneWay, sendOneWayOrderly, and sendMessageInTransaction are provided
  • The afterPropertiesSet method executes producer.start(); The destroy method performs producer.shutdown() and TransactionMQProducer shutdown and clears the cache set
  • The doSend method internally calls the syncSend method and returns only debug output from sendResult. DoConvert method in view of the type String content don’t do processing, other types using objectMapper. WriteValueAsString converted to a String as the content, and then construct the message, Perform postProcessor. PostProcessMessage, and then return

syncSend

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
     *
     * @param destination formats: `topicName:tags`
     * @param message     {@link org.springframework.messaging.Message}
     * @param timeout     send timeout with millis
     * @param delayLevel  level for the delay message
     * @return{@link SendResult} */ public SendResult syncSend(String destination, Message<? > message, long timeout, int delayLevel) {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("syncSend failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }

        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                charset, destination, message);
            if (delayLevel > 0) {
                rocketMsg.setDelayTimeLevel(delayLevel);
            }
            SendResult sendResult = producer.send(rocketMsg, timeout);
            long costTime = System.currentTimeMillis() - now;
            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("syncSend failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    /**
     * syncSend batch messages in a given timeout.
     *
     * @param destination formats: `topicName:tags`
     * @param messages    Collection of {@link org.springframework.messaging.Message}
     * @param timeout     send timeout with millis
     * @return{@link SendResult} */ public SendResult syncSend(String destination, Collection<Message<? >> messages, long timeout) {if (Objects.isNull(messages) || messages.size() == 0) {
            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
            throw new IllegalArgumentException("`messages` can not be empty");
        }

        try {
            long now = System.currentTimeMillis();
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
            org.apache.rocketmq.common.message.Message rocketMsg;
            for(Message<? > msg:messages) {if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
                    log.warn("Found a message empty in the batch, skip it");
                    continue;
                }
                rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg);
                rmqMsgs.add(rocketMsg);
            }

            SendResult sendResult = producer.send(rmqMsgs, timeout);
            long costTime = System.currentTimeMillis() - now;
            log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size()); throw new MessagingException(e.getMessage(), e); }}Copy the code
  • SyncSend method supports single and multiple org. Springframework. Messaging. The Message, including the interface of a single Message support delayLevel

syncSendOrderly

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
     *
     * @param destination formats: `topicName:tags`
     * @param message     {@link org.springframework.messaging.Message}
     * @param hashKey     use this key to select queue. for example: orderId, productId ...
     * @param timeout     send timeout with millis
     * @return{@link SendResult} */ public SendResult syncSendOrderly(String destination, Message<? > message, StringhashKey, long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }

        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                charset, destination, message);
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
            long costTime = System.currentTimeMillis() - now;
            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }}Copy the code
  • The syncSendOrderly method internally calls the producer.send(rocketMsg, messageQueueSelector, hashKey, timeout) method and synchronously returns SendResult

asyncSend

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.
     *
     * @param destination  formats: `topicName:tags`
     * @param message      {@link org.springframework.messaging.Message}
     * @param sendCallback {@link SendCallback}
     * @param timeout      send timeout with millis
     * @param delayLevel   level forthe delay message */ public void asyncSend(String destination, Message<? > message, SendCallback sendCallback, long timeout, int delayLevel) {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("asyncSend failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }

        try {
            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                charset, destination, message);
            if (delayLevel > 0) {
                rocketMsg.setDelayTimeLevel(delayLevel);
            }
            producer.send(rocketMsg, sendCallback, timeout);
        } catch (Exception e) {
            log.info("asyncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }}Copy the code
  • AsyncSend (rocketMsg, SendCallback, timeout)

asyncSendOrderly

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
     * addition.
     *
     * @param destination  formats: `topicName:tags`
     * @param message      {@link org.springframework.messaging.Message}
     * @param hashKey      use this key to select queue. forexample: orderId, productId ... * @param sendCallback {@link SendCallback} * @param timeout send timeout with millis */ public void asyncSendOrderly(String destination, Message<? > message, StringhashKey, SendCallback sendCallback,
                                 long timeout) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }

        try {
            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                charset, destination, message);
            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
        } catch (Exception e) {
            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }}Copy the code
  • The asyncSendOrderly method internally executes producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout).

sendOneWay

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. * 

* One-way transmission is used for cases requiring moderate reliability, such as log collection. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} */ public void sendOneWay(String destination, Message message) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("sendOneWay failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); producer.sendOneway(rocketMsg); } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }}

Copy the code
  • The sendOneWay method internally executes producer.sendoneway (rocketMsg)

sendOneWayOrderly

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
     *
     * @param destination formats: `topicName:tags`
     * @param message     {@link org.springframework.messaging.Message}
     * @param hashKey     use this key to select queue. forexample: orderId, productId ... */ public void sendOneWayOrderly(String destination, Message<? > message, StringhashKey) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }

        try {
            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                charset, destination, message);
            producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
        } catch (Exception e) {
            log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); throw new MessagingException(e.getMessage(), e); }}Copy the code
  • The internal execution of sendOneWayOrderly is producer.sendOneway(rocketMsg, messageQueueSelector, hashKey)

sendMessageInTransaction

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**
     * Send Spring Message in Transaction
     *
     * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
     * @param destination     destination formats: `topicName:tags`
     * @param message         message {@link org.springframework.messaging.Message}
     * @param arg             ext arg
     * @returnTransactionSendResult * @throws MessagingException */ public TransactionSendResult sendMessageInTransaction(final String  txProducerGroup, final String destination, final Message<? > message, final Object arg) throws MessagingException { try { TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message);returntxProducer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); }}Copy the code
  • Internal execution sendMessageInTransaction method is txProducer. SendMessageInTransaction (rocketMsg, arg)

summary

  • RocketMQTemplate inherited the spring – messaging AbstractMessageSendingTemplate, implements InitializingBean DisposableBean interfaces; Methods such as syncSend, syncSendOrderly, asyncSend, asyncSendOrderly, sendOneWay, sendOneWayOrderly, and sendMessageInTransaction are provided
  • The afterPropertiesSet method executes producer.start(); The destroy method performs producer.shutdown() and TransactionMQProducer shutdown and clears the cache set
  • The doSend method internally calls the syncSend method and returns only debug output from sendResult. DoConvert method in view of the type String content don’t do processing, other types using objectMapper. WriteValueAsString converted to a String as the content, and then construct the message, Perform postProcessor. PostProcessMessage, and then return

doc

  • RocketMQTemplate