preface

RocketMQ version of message queue is a distributed message middleware with low latency, high concurrency, high availability and high reliability built by Ali Cloud based on Apache RocketMQ. Message queue RocketMQ not only provides asynchronous decoupling and peak-filling capabilities for distributed applications, but also features massive message stacking, high throughput, and reliable retry for Internet applications. For details, please refer to the RocketMQ version of Ali Cloud website.

In this article, we will introduce Alibaba Cloud version of RocketMQ, through this study, you will learn:

  • What is a delayed queue
  • Application scenario of delay queue
  • Alibaba Cloud RocketMQ (Commercial Cloud Version) application steps
  • Integrate SpringBoot RocketMQ

What is a delayed queue

As the name implies, a delay queue is a queue that is consumed by delay. First, queues mean that the internal elements are in order, and elements are in and out of queues. In normal queues, messages are immediately consumed by consumers once queued. The most important feature of the delay queue is reflected in its delay attribute. Unlike ordinary queues, the elements in the delay queue want to be taken out and processed at a specified time. It can be said that the elements in the delay queue are all with time attribute. Simply put, a delay queue is a queue that holds elements that need to be processed at a specified time.

Application scenario of delay queue

There are many application scenarios of delay queue, such as the following scenarios:

  • Orders that are not paid within 30 minutes are automatically cancelled.
  • The delivery platform will send the order notification, and 60 seconds after the order is successfully placed, it will push the SMS message to the user
  • After a scheduled meeting, each participant is notified 10 minutes before the scheduled time to attend the meeting
  • After the successful payment of goods, 24 hours without comments will automatically praise
  • If the goods order is not paid within one week, it will be settled automatically and the inventory will be returned.

You may have noticed that these scenarios have one thing in common: a task needs to be completed at a given point in time. At this point, someone would say, well, these things can use timed tasks, polling data all the time, pulling out data every second that needs to be processed and processed. You can do this if you have a small amount of data; If there is no strict limitation on the real-time performance of data, it is indeed a feasible solution to perform scheduled tasks every hour to produce data. However, for scenarios with a large amount of data and strong timeliness, this polling method is difficult to finish processing business data within a cycle, which will bring great pressure to the system and database, which cannot meet business requirements and has low performance.

At this time, the delay queue can shine on stage, the above scene, is the use of delay queue. There are many ways to implement delay queues, such as DelayQueue DelayQueue, RabbitMQ queue, Redis expiration callback and so on.

Next, this article shows you how to implement delayed queues using RocketMQ.

RocketmMq is different from the commercial version

reliability

The commercial version supports synchronous flush and synchronous double-write to ensure message reliability.

Strict message order

Alibaba Business MQ supports stricter message ordering (strict FIFO), in which a Broker goes down without being out of order.

Timed messages and delayed messages

The open source RocketMQ version only supports timing levels, not arbitrary time precision. Ali MQ supports timing levels, as well as the specified millisecond Level delay time

Transaction scenarios unique to the business edition

MQ supports transactional messaging to meet the consistency needs of the enterprise.

  • The open source rocketMQ is the core of Ali MQ, but the rocketMQ transaction message is different from the ali MQ transaction implementation mechanism. The transaction code of Ali MQ is not open source.

  • Ali MQ combined with Ali GTS can realize a variety of message transactions, including service + message transaction consistency, transaction consistency between multiple services on the message link and other capabilities.

The commercial version has more professional operation and maintenance support

It also provides professional o&M support, such as link trace tracing, message query, alarm monitoring, and message tracing.

Alibaba Cloud RocketMQ (Commercial Cloud Version) application steps

Prerequisites You have registered an Alicloud account and completed real-name authentication

Search rocketmq

Select the RocketMQ version of the message queue

Free open

In the dialog box that is displayed, click Enable MQ service.

On the Confirm order page, after reading the order contents and the service agreement, select I have read and agree to the MESSAGE Queue MQ Service Agreement and click Open now.

When you see the following information, you have successfully enabled the message queue service.

For instance
  • Log in to the Message queue RocketMQ version console.
  • In the left navigation bar, click the instance list.
  • On the Instance list page, click Create Instance.

In the Create RocketMQ instance panel, select the instance type, enter a name and description, and click OK

To apply for the topic

  • In the left navigation bar of the page where the instance is located, click Topic Management.
  • On the Topic Management page, click Create Topic

In the Create Topic panel, enter a name and description, select the message type for the Topic, and click OK.

Application for group

  • On the left navigation bar of the instance page, click Group Management.
  • On the Group management page, choose TCP > Create Group ID.

In the Create Groups available for TCP panel, enter the Group ID and description, and click OK

Obtain The access point nameServiceAddress

  • On the left navigation bar of the page where the instance is located, click Instance Details.
  • On the Instance Details page, click the Access point TAB.

In the TCP(TCP) client access point area, move the mouse pointer to the required access point name and click to complete replication

Get the AccessKey ID and Secret

  • Log in to the RAM console using your cloud account.
  • In the left navigation bar, click Users under the People Management menu.
  • Under the User Login Name/Display name list, click the target RAM user name.
  • In the User AccessKey area, click Create AccessKey.

After completing the above preparations, we can obtain the basic configuration information of the RocketMQ message queue, and we will implement message sending and subscription with the RocketMQ version of the message queue.

Integrate SpringBoot RocketMQ

Importing dependency packages

<! -- aliyun business version rocketmq--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.84..Final</version>
        </dependency>
Copy the code

Add attributes to the yML profile

Nepenthe: rocketMQ: accessKey: # connect to accessKey ID, replace it with your secretKey: # connect to accessKey secret, replace it with your nameSrvAddr: # producer ons access domain, replace with your own delayTopic: # production topic, replace with your own groupId: # producer ID (old version is producer ID, new version is groupId), replace with your own timeoutMillis: # Send expiration time, customCopy the code

Encapsulate the RocketMQ configuration file

@ Configuration public class RocketMqConfig {/ * * * the AccessKey authentication need ID * / @ Value (" ${nepenthe. Rocketmq. The AccessKey} ") private String accessKey; / * * * the AccessKey authentication need Secret * / @ Value (" ${nepenthe. Rocketmq. SecretKey} ") private String secretKey. / * * * instance TCP public access address (the actual project, fill in your ali cloud MQ public address) * / @ Value (" ${nepenthe. Rocketmq. NameSrvAddr} ") private String nameSrvAddr; Delay queue topic / * * * * / @ Value (" ${nepenthe. Rocketmq. DelayTopic} ") private String delayTopic; / * * * delay queue group * / @ Value (" ${nepenthe. Rocketmq. GroupId} ") private String groupId. / * * * send timeout set * / @ Value (" ${nepenthe. Rocketmq. TimeoutMillis} ") private String timeoutMillis; public Properties getRocketMqProperty() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID,this.getGroupId()); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.timeoutMillis); return properties; } //TODO get setCopy the code

Allocation producer

@Component public class RocketMqProducerInit { @Autowired private RocketMqConfig mqConfig; private static Producer producer; @construct public void init(){system.out.println (" Start RocketMq producer!" ); producer = ONSFactory.createProducer(mqConfig.getRocketMqProperty()); // the start method is used to start the Producer before sending the message. It only needs to be called once. When the project is shutdown, the Producer is automatically shutdown. } /** * initialize Producer * @return */ public Producer getProducer(){return Producer; }}Copy the code

The method by which a producer sends a message

@Component public class ProducerHandler { private Logger logger = LoggerFactory.getLogger(ProducerHandler.class); @Autowired private RocketMqConfig config; @Autowired private RocketMqProducerInit producer; @param messageBody messageBody content, @param msgKey message key value, * @param delayTime * @return success:SendResult or error:null public SendResult sendTimeMsg(String msgTag,  byte[] messageBody, String msgKey, long delayTime) { Message msg = new Message(config.getDelayTopic(),msgTag,msgKey,messageBody); msg.setStartDeliverTime(delayTime); return this.send(msg); } private SendResult send(Message MSG) {try {SendResult SendResult = producer.getProducer().send(msg); If (sendResult! = null) {logger. The info (" RocketMQ messages sent success - Topic: {}, msgId: {}, Key: {}, tag: {}, message:{}" ,msg.getTopic(),sendResult.getMessageId(),msg.getKey(),msg.getTag(),new String(msg.getBody())); return sendResult; }else {logger.error(" RocketMQ message failed to send -- Topic:{}, Key:{}, tag:{}, message:{}" ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody())); return null; }} catch (Exception e) {logger.error(" RocketMQ message failed to send -- Topic:{}, Key:{}, tag:{}, Message: {}, the reason for the error: {} ", MSG. GetTopic (), MSG, getKey (), MSG, getTag (), the new String (MSG) getBody ()), um participant etMessage ()); return null; }}Copy the code

Configuration consumer

@Component public class RocketmqConsumerInit { @Autowired private RocketMqConfig mqConfig; private static Consumer consumer; @construct public void init(){system.out.println (" Start consumer! ); consumer = ONSFactory.createConsumer(mqConfig.getRocketMqProperty()); / / listen to the first topic, new corresponding listener consumer. The subscribe (mqConfig. GetDelayTopic (), "*", new DelayQueueMessageListener ()); // before sending the message, the start method must be called to start consumer. It only needs to be called once. When the project is closed, consumer.start() is automatically shutdown; } public Consumer getConsumer(){return Consumer; }}Copy the code

Consumer message monitoring

@Service public class DelayQueueMessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @override public Action consume(consume Message, ConsumeContext ConsumeContext) {logger.info(" consume MQ Message -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}", message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), new String(message.getBody())); Try {//TODO processes the business // Consume successfully, continue to consume the next message return Action.CommitMessage; } catch (Exception e) {logger.error(" Failed to consume MQ message! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage()); Return Action.ReconsumeLater; // Consumelater fails to consumelater. }}}Copy the code

The results

@Autowired private ProducerHandler producerHandler; @RequestMapping("delayMsg") public void delayMsg2(String msg, Integer delayTime) { JSONObject body = new JSONObject(); body.put("userId", "this is userId"); Body. Put ("notice", "synchronize message "); producerHandler.sendTimeMsg("userMessage", msg.getBytes(), "messageId", System.currentTimeMillis()+delayTime); }Copy the code

The log is as follows:

2021-02-25 18:05:20. 584 [HTTP - nio - 8080 - exec] 1591467 INFO C.X.N.M.M q.r ocketmq. ProducerUtil - TraceId xakuqkt0qv6 [1614247520] MQ messages sent success - Topic: auto_publish_delay_queue_topic, msgId: 0 a4922e703d914dad5dc7f7a4908000d , Key:messageId, tag:userMessage, Body: I am a delay queue delay the 2021-02-25 18:05:26 for 10 SEC 389 [HTTP - nio - 8080 - exec - 9] 1597272 INFO C.X.N.M.M q.r ocketmq. ProducerUtil - TraceId jymlryax9m6 [1614247526] MQ messages sent success - Topic: auto_publish_delay_queue_topic, msgId: 0 a4922e703d914dad5dc7f7a5f780010 , Key:messageId, tag:userMessage, Body: I am a delay queue delay. 12 seconds 2021-02-25 18:05:30 620 [ConsumeMessageThread_3] 1601503 INFO C.X.N.M.M.R.D elayQueueMessageListener - Receives the MQ message - Topic: auto_publish_delay_queue_topic, tag: userMessage, msgId: bf58a99274cfadeddbec720c12d8607d, Key:messageId, Body: I am a delay queue delay the 2021-02-25 18:05:38 for 10 SEC 871 [ConsumeMessageThread_4] 1609754 INFO C.X.N.M.M.R.D elayQueueMessageListener - Receives the MQ message - Topic: auto_publish_delay_queue_topic, tag: userMessage, msgId: d44270736cdfa2927194650c451584c3, Key:messageId, body: I am delay queue delay 12 secondsCopy the code

Notice that both messages were sent and consumed as expected. This completes RocketMQ’s part of implementing delayed queues.

Reference documentation

Alibaba Cloud RocketMQ message queue developer documentation