Chapter 4 RocketMQ applications

I. General information

1 Message sending classification

Producer also has a variety of options for sending messages, and different modes have different system effects.

Synchronous message sending

Synchronous sending means that after Producer sends a message, the Producer sends the next message only after receiving an ACK from MQ. This mode has the highest message reliability, but the message sending efficiency is low.

Sending messages asynchronously

Sending messages asynchronously means that the Producer directly sends the next message without waiting for an ACK from MQ. In this way, message reliability and message sending efficiency can be guaranteed.

One-way message sending

Sending messages one-way means that the Producer only sends messages and does not wait for or process ACK of MQ. MQ also does not return an ACK when sending this mode. This mode has the highest message sending efficiency, but the message reliability is poor.

2 Code Examples

Create a project

Create a Maven Java project rocketMq-test.

Import dependence

Import rocketMQ client dependencies.

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
	<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>4.8.0</version>
	</dependency>
</dependencies>
Copy the code

Define asynchronous message sending producers

public class SyncProducer {
	public static void main(String[] args) throws Exception {
		// Create a producer with the name of the Producer Group
		DefaultMQProducer producer = new DefaultMQProducer("pg");
		// Specify the nameServer address
		producer.setNamesrvAddr("rocketmqOS:9876");
		// Set the number of retry times when sending fails. The default value is 2
		producer.setRetryTimesWhenSendFailed( 3 );
		// Set the sending timeout period to 5s, 3s by default
		producer.setSendMsgTimeout( 5000 );
        
        // Start the producer
		producer.start();
        
        // Produce and send 100 messages
		for (int i = 0 ; i < 100 ; i++) {
			byte[] body = ("Hi," + i).getBytes();
			Message msg = new Message("someTopic"."someTag", body);
			// Specify a key for the message
			msg.setKeys("key-" + i);
			// Send a message
			SendResult sendResult = producer.send(msg);
			System.out.println(sendResult);
        }
		/ / close the producerproducer.shutdown(); }}Copy the code
// Message sending status
public enum SendStatus {
	SEND_OK, // The message was sent successfully
	FLUSH_DISK_TIMEOUT,  // Flush timed out. This exception occurs only when the Broker sets the flush policy to synchronous flush. Asynchronous flush does not appear
	FLUSH_SLAVE_TIMEOUT, // Slave Timed out. This exception occurs only when the master-slave replication mode of the Broker cluster is set to synchronous replication. Asynchronous replication does not occur
	SLAVE_NOT_AVAILABLE, // No Slave is available. This exception occurs only when the Master-Slave replication mode of the Broker cluster is set to synchronous replication. Asynchronous replication does not occur
}
Copy the code

Define asynchronous message sending producers

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
		DefaultMQProducer producer = new DefaultMQProducer("pg");
		producer.setNamesrvAddr("rocketmqOS:9876");
		// Do not retry sending after asynchronous sending fails
        producer.setRetryTimesWhenSendAsyncFailed( 0 );
		// Set the number of queues for the newly created Topic to 2, default to 4
		producer.setDefaultTopicQueueNums( 2 );
        
        producer.start();
        
        for (int i = 0 ; i < 100 ; i++) {
			byte[] body = ("Hi," + i).getBytes();
			try {
				Message msg = new Message("myTopicA"."myTag", body);
				// Send it asynchronously. Specify the callback
				producer.send(msg, new SendCallback() {
					// The callback method is triggered when producer receives an ACK from MQ
					@Override
					public void onSuccess(SendResult sendResult) {
						System.out.println(sendResult);
					}
    				@Override
                    public void onException(Throwable e) { e.printStackTrace(); }}); }catch(Exception e) { e.printStackTrace(); }}// end-for
        
        // Sleep for a while
        // If there is no sleep,
        // If the message is not sent, the producer will be closed
        TimeUnit.SECONDS.sleep( 3); producer.shutdown(); }}Copy the code

Defines a one-way message sending producer

public class OnewayProducer {	public static void main(String[] args) throws Exception{		DefaultMQProducer producer = new DefaultMQProducer("pg");		producer.setNamesrvAddr("rocketmqOS:9876");		producer.start();   		        for (int i = 0 ; i < 10 ; i++) {			byte[] body = ("Hi," + i).getBytes();			Message msg = new Message("single"."someTag", body);			// send producer.sendoneway (MSG); } producer.shutdown(); System.out.println("producer shutdown"); }}
Copy the code

Defining message consumers

public class SomeConsumer {
	public static void main(String[] args) throws MQClientException {
		// Define a pull consumer
		// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
		// Define a push consumer
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
		/ / specify the nameServer
		consumer.setNamesrvAddr("rocketmqOS:9876");
		// Specify consuming from the first message
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		// Specify the consumption topic and tag
		consumer.subscribe("someTopic"."*");
		// Specify broadcast mode for consumption. Default is cluster mode.
		// consumer.setMessageModel(MessageModel.BROADCASTING);
        
        // Register a message listener
		consumer.registerMessageListener(new MessageListenerConcurrently() {
    		// This method is triggered as soon as the broker has a message to which it subscribes,
			// The return value is the current state of consumer consumption
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
				// Consume messages one by one
				for (MessageExt msg : msgs) {
					System.out.println(msg);
				}
				// Return consumption status: consumption succeeded
				returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start consumer spending
		consumer.start();
		System.out.println("Consumer Started"); }}Copy the code

Second, sequential messages

1 What are sequential messages

Sequential messages are messages that are consumed strictly in the order in which they are sent (FIFO).

By default, producers send messages to different partitioned queues in Round Robin mode. While consuming messages will pull messages from multiple queues, the order of sending and consuming is not guaranteed. If messages are sent only to the same Queue and are consumed only from that Queue, the order of the messages is strictly guaranteed.

2 Why are sequential messages needed

For example, you now have TOPIC ORDER_STATUS, which has four queues under it, and the different messages in this TOPIC are used to describe the different states of the current order. Assume that the order has status: unpaid, paid, shipping, shipping successful, shipping failed.

Based on the order status above, the producer can generate the following messages from the sequence:

Order T0000001: Unpaid –> Order T0000001: Paid –> Order T0000001: Shipping –> Order T0000001: shipping failed

After the message is sent to MQ, the selection of the Queue if a polling strategy is used, the message may be stored in MQ as follows:

In this case, we want the Consumer to consume the messages in the same order as we send them, but we can’t guarantee that the order is correct in the way MQ is delivered and consumed. For out-of-order messages, even if the Consumer is set up with some state tolerance, it can’t handle all of these random combinations.

Based on the above situation, a scheme can be designed as follows: for messages with the same order number, through certain policies, they are placed in a Queue, and then consumers adopt certain policies (for example, a thread independently processes a Queue to ensure the order of processing messages) to ensure the order of consumption.

3. Classification of order

Depending on the ordered scope, RocketMQ can strictly guarantee the ordering of two kinds of messages: partitioned and global.

The global order

The order guaranteed when there is only one sending and consuming Queue is the order of messages in the entire Topic, called global order.

Specify the number of queues when creating a Topic. There are three ways to specify:

1) When creating Producer in code, you can specify the number of queues that Producer automatically creates

2) Specify the number of queues when manually creating a Topic in the RocketMQ visual console

3) Specify the number of queues when creating a Topic manually using the mqadmin command

Partition and orderly

If more than one Queue participates, which can only guarantee the order of messages on the partitioned Queue, then calledPartition and orderly.

How to implement Queue selection? When defining Producer, we can specify message queue selectors that we ourselves implement for the MessageQueueSelector interface.

When defining the selection algorithm for a selector, a selection key is usually used. The selection key can be a message key or other data. But whoever makes the choice key, it can’t be repeated, it’s unique.

A general selection algorithm modulates the selection key (or its hash value) with the number of queues that the Topic contains, resulting in the QueueId of the selected Queue.

There is a problem in the molding algorithm: the molding results of different selection keys and queues may be the same, that is, messages with different selection keys may appear in the same Queue, that is, the same Consuemr may consume messages with different selection keys. How can this problem be solved? The general approach is to get the selection key from the message and judge it. If the current Consumer needs to consume information, it consumes it directly; otherwise, it does nothing. This approach requires that the selection key be available to the Consumer along with the message. It is a good idea to use the message key as the key of choice.

Will the above approach lead to the following new problems? The message that does not belong to the Consumer is pulled, so can the Consumer that should consume the message consume it again? Messages in the same Queue cannot be consumed by different consumers in the same Group. Therefore, consumers that consume messages with different selection keys in a Queue must belong to different groups. However, consumption among consumers in different groups is isolated from each other and does not affect each other.

4 Code Examples

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        
        for(int i=0; i<100; i++){
            
            Integer orderId = i;
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TopicA"."TagA", body);
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs,Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    returnmqs.get(index); } }, orderId); System.out.println(sendResult); } producer.shutdown(); }}Copy the code

Delay message

1 What is a delayed message

Delayed messages are messages that can be consumed for a specified amount of time after they have been written to the Broker.

Delayed messages with RocketMQ enable timed tasks without the need for timers. Typical application scenarios are the scenario of closing orders without payment due to timeout in e-commerce transactions, and the scenario of cancelling tickets due to timeout without payment on 12306 platform.

In an e-commerce platform, a delayed message is sent when an order is created. The message will be delivered to the Consumer 30 minutes later, and the Consumer will determine whether the corresponding order has been paid. If not, cancel the order and put the item back in stock. If the payment is completed, it is ignored.

In 12306, a delayed message is sent when a ticket is booked. The message will be delivered to the Consumer 45 minutes later, and the Consumer will determine whether the corresponding order has been paid. If not, the reservation is cancelled and the ticket is returned to the ticket pool. If the payment is completed, it is ignored.

2 Delay Level

Delay duration of message delayArbitrary duration is not supportedIs specified by a specific delay level. Latency levels are defined on the RocketMQ serverMessageStoreConfigClass in the following variables:

That is, if the delay level is 3, the delay duration is 10s. That is, the delay level starts from 1.

Of course, if you need a custom delay level, you can add the following configuration to the broker load (for example, 1 day level 1D). The configuration files are in the conf directory under the RocketMQ installation directory.

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
Copy the code

3 Implementation principle of delayed message

The concrete implementation scheme is as follows:

Modify the message

After the Producer sends a message to the Broker, the Broker first writes the message to a Commitlog file, which then needs to be distributed to the corresponding ConsumeQueue. However, before distributing the message, the system determines whether there is a delay level in the message. If not, it will be distributed directly as normal; If so, there is a complicated process:

  • Modify the Topic of the message to SCHEDULE_TOPIC_XXXX

  • According to the delay level, create the corresponding queueId directory and consumequeue file in the consumeQueue directory under the SCHEDULE_TOPIC_XXXX topic (if there are no such directories and files).

The mapping between delayLevel and queueId is queueId = delayLevel-1

Note that when creating a queueId directory, not all directories corresponding to all latency levels are created at one time

  • Modify message index unit content. The Message Tag HashCode part of the index unit originally stores the Hash value of the Message’s Tag. Now change to the delivery time of the message. Post time is the time when the message is rewritten to the original Topic and then written to the Commitlog again. Delivery time = message storage time + delay level time. Message store time refers to the timestamp when the message was sent to the Broker.

  • Writes the message index to the corresponding consumequeue under the SCHEDULE_TOPIC_XXXX topic

    How are messages of each delay level Queue in the SCHEDULE_TOPIC_XXXX directory sorted?

    Is sorted by message delivery time. All delayed messages of the same rank within a Broker are written to the same Queue in the consumeQueue directory SCHEDULE_TOPIC_XXXX. That is, the latency levels of message delivery times in a Queue are the same. The delivery time then depends on the message storage time. That is, messages are sorted by the time they were sent to the Broker.

Delivery delay message

The Broker has a delayed message service class, ScheuleMessageService, which consumes messages in SCHEDULE_TOPIC_XXXX and delivers delayed messages to the target Topic according to the delivery time of each message. However, before Posting, the original written message is read again from the Commitlog and its original delay level is set to 0, that is, the original message becomes an ordinary message without delay. The message is then delivered again to the target Topic.

When the Broker starts, ScheuleMessageService creates and starts a TImer to execute the corresponding scheduled task. The system defines a corresponding number of TimerTasks according to the number of delay levels. Each TimerTask is responsible for consuming and delivering messages of a delay level. Each TimerTask checks whether the first message in the corresponding Queue is due. If the first message is not due, all subsequent messages are not due (messages are sorted by delivery time). If the first message expires, it is delivered to the target Topic, that is, consumed.

Write the message back to the Commitlog

The Delayed message service class ScheuleMessageService sends the delayed message to the Commitlog again, and again forms a new message index entry and distributes it to the corresponding Queue.

This is just a normal message send. However, the message Producer is a delayed message service class ScheuleMessageService.

4 Code Examples

Define DelayProducer class

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        for (int i = 0 ; i < 10 ; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TopicB"."someTag", body);
            // Set the message latency level to 3, that is, 10 seconds
            // msg.setDelayTimeLevel(3);
            SendResult sendResult = producer.send(msg);
            // Outputs the time when the message was sent
            System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
            System.out.println(","+ sendResult); } producer.shutdown(); }}Copy the code

Define OtherConsumer class

public class OtherConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicB"."*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // Outputs the time when the message was consumed
                    System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
                    System.out.println("," + msg);
                }
        		returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.println("Consumer Started"); }}Copy the code