preface

This article focuses on using RocketMQ to send messages.

Mq client core class -DefaultMQProducer

Core method

@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}
Copy the code

Interface for sending messages

Default implementation class

Class diagram

Encapsulate the core classes of the MQ client

Encapsulate it yourself, write a message sending utility class, or put it in a small JAR.

The source code to achieve

package xxx.rocketmq; import xxx.util.LogUtil; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * public class Producer {private String namesrvAddr; //ip private String producerGroup; //group private DefaultMQProducer producer; Public String getNamesrvAddr() {return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getProducerGroup() { return producerGroup; } public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } // Initialize public void init() {logutil.infolog.info (" Rocketmq-producer initialization parameters start "); LogUtil.INFOLOG.info("producerGroup: " + producerGroup); producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); Producer.setinstancename (long.toString (system.currentTimemillis ()))); / / the Message Body size exceeds a threshold, the compression, the default is 1024 * 4 producer. SetCompressMsgBodyOverHowmuch (Integer. MAX_VALUE); Try {// Start the producer.start(); } catch (Exception e) {logutil.infolog. error(" Rocketmq-producer initialization parameters are abnormal: ", e); } } public void destroy() { producer.shutdown(); } /** * @param message * @return */ public SendResult send(message message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {// Call the official default implementation to send messages SendResult result = producer.send(message); return result; }}Copy the code

Define the bean

<! <bean id="mqProducer_xxx" class="xxx.rocketmq.Producer" init-method="init" destroy-method="destroy"> <property  name="namesrvAddr" value="${namesrvAddr}"/> <property name="producerGroup" value="${rocketmq.xxx.group}"/> </bean>Copy the code

The application layer

Application layer utility classes

The application project is simply encapsulated, and the topic function is added.

Define the bean

<bean id="sendMqProcess_xxx" class="xxx.trade.process.SendMqProcess">
    <property name="producer" ref="mqProducer_xxx"/>
    <property name="topic" value="${rocketmq.xxx.topic}"/>
    <property name="subExpression" value="${rocketmq.xxx.subExpression}"/>
</bean>
Copy the code

The source code to achieve

package xxx.trade.process; import xxx.core.exception.BizException; import xxx.rocketmq.Producer; import xxx.trade.constant.xxxReturnCode; import xxx.trade.util.LogUtil; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.SendStatus; import com.alibaba.rocketmq.common.message.Message; public class SendMqProcess { private Producer producer; private String topic; private String subExpression; /** * Send mq message ** @param jsonMsgContent * message content, The value must be in JSON format * @return * @throws BizException * @see */ public SendStatus sendMqMessage(String jsonMsgContent) throws BizException { try { Message msg = new Message(topic, subExpression, (jsonMsgContent).getBytes("UTF-8")); SendResult sendResult = producer.send(msg); Logutil.infolog.info (" message content :" + jsonMsgContent + "\n Returns result :" + sendresult.toString ()); return sendResult.getSendStatus(); } catch (Exception e) { throw new BizException(xxxReturnCode.DEAL_EXCEPTION.getCode(), Xxxreturncode.deal_exception. GetDesc ("Producer send message error: "+ jsonMsgContent) + e); } } public void setProducer(Producer producer) { this.producer = producer; } public void setTopic(String topic) { this.topic = topic; } public void setSubExpression(String subExpression) { this.subExpression = subExpression; }}Copy the code

The application layer provides static methods for sending messages

Provides static methods of the class for easy use.

public class xxxMerchantNotifyManager { private static final SendMqProcess SENDMQPROCESS_xxx = (SendMqProcess) SpringContext .getService("sendMqProcess_xxx"); // Asynchronous thread sends message to MQ public static void sendMqMessage(final BackMethodParm BackMethodParm, final Orderbill Orderbill, final boolean ignore) { Runnable childThread = new Runnable() { @Override public void run() { boolean isSentOk = false; String xxx = orderbill.getxxx(); Json = convertJSONString(backMethodParm, ignore); Try {// Send message queue logutil.infolog.info ("sendMqMessage message content :" + json); SendStatus state = SENDMQPROCESS_QRCODE .sendMqMessage(json); Logutil.infolog.info ("sendMqMessage returns content :" + state.tostring ()); if (state == SendStatus.SEND_OK || state == SendStatus.FLUSH_DISK_TIMEOUT || state == SendStatus.FLUSH_SLAVE_TIMEOUT || state == SendStatus.SLAVE_NOT_AVAILABLE) { isSentOk = true; }} the catch (BizException e) {log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "abnormal asynchronous notifications", e); } the catch (Exception e) {log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "abnormal asynchronous notifications", e); {}} the catch (Exception ex) log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "asynchronous notification sent the extraordinary turn json object", the ex); } // MQ send failed to enter fault tolerance if (! isSentOk) { try { NotifyFault entity = content2NotifyFault(orderbill, backMethodParm); The info (LogConst THROWABLEEXCEPTIONS + + "asynchronous notifications:" XXX + entity); ServiceDeclare.notifyFaultService .insertNotifyFault(entity); } the catch (Exception e) {log. The error (LogConst. THROWABLEEXCEPTIONS + XXX + "asynchronous notification message turned fault-tolerant processing abnormity", e); }}}}; try { threadPool.execute(childThread); {} the catch (Throwable t) log. The error (LogConst. THROWABLEEXCEPTIONS + exception handling mq message: "", t); }}Copy the code

Call a static method to send a message

xxxMerchantNotifyManager.sendMqMessage();
Copy the code

conclusion

The idea is to send messages based on the official default implementation of the MQ client, and then encapsulate them yourself, or even multiple layers, each layer of encapsulation, just a little bit more for special cases. For test use, you can simply call the official default implementation to send a message. But in a production environment, there’s usually one or more layers.