In this lecture we’re going to talk about sequential messages.

What are the sequential messages

First, what are sequential messages?

Consumption refers to the order in which messages are sent. In RocketMQ, messages are guaranteed to be strictly ordered, either locally ordered or globally ordered.

Partial order

What is local order?

Each message in each MessageQueue is kept in relative order. However, it is not guaranteed that all MessageQueue messages are strictly ordered.

Example: Order 1: Create – Place – pay – Complete Order 2: Create – place – pay

Order 1 and Order 2 are respectively on different MessageQueue, and they only need to ensure that the messages in MessageQueue are in order.

A MessageQueue can only be consumed by one consumer, and only by a single thread. But the consumer can turn on multithreading and consume multiple MessageQueue simultaneously.

The global order

Now that you know local order, global order is even easier.

There is only one MessageQueue. This way all messages will be sent to this MessageQueue. This ensures that all messages are in strict order.

A MessageQueue can only be consumed by one consumer, and only by a single thread.

Producers send messages in sequence

Next, we use code to demonstrate local order:

public class OrderedProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("order_producer_group"); producer.setNamesrvAddr("localhost:9876"); // Start producer. Start (); List<OrderEntity> list = buildOrderList(); for (int i = 0; i < list.size(); i++) { int orderId = list.get(i).getId(); //Create a message instance, specifying topic, tag and message body. Message msg = new Message("orderTopic", "TagA", "KEY" + i, (list.get(i).toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); Println (" orderId :"+orderId+ "+ sendResult); } // Close producer producer.shutdown(); } private static List<OrderEntity> buildOrderList() { List<OrderEntity> res = new ArrayList<>(); OrderEntity order1 = new OrderEntity(147," add to cart "); OrderEntity order2 = new OrderEntity(147," order "); OrderEntity order3 = new OrderEntity(147," payment "); OrderEntity order4 = new OrderEntity(147," complete "); OrderEntity order5 = new OrderEntity(258," add to cart "); OrderEntity order6 = new OrderEntity(258," order "); OrderEntity order7 = new OrderEntity(369," add to cart "); OrderEntity order8 = new OrderEntity(369," order "); OrderEntity order9 = new OrderEntity(369," payment "); res.add(order1); res.add(order2); res.add(order3); res.add(order4); res.add(order5); res.add(order6); res.add(order7); res.add(order8); res.add(order9); return res; }} // Run result: Send the order id: 147 results: SendResult [sendStatus = SEND_OK, msgId = 7 f0000010a0118b4aac22be4b1f80000, offsetMsgId=0AFCA6FA00002A9F000000000009FBA7, messageQueue=MessageQueue [topic=orderTopic, BrokerName = aarondemacbook-pro. local, queueId=3], queueOffset=44] order id:147 SendResult :SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B1FD0001, offsetMsgId=0AFCA6FA00002A9F000000000009FC96, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=3], QueueOffset = 45] order id: 147 send results: SendResult [sendStatus = SEND_OK, msgId = 7 f0000010a0118b4aac22be4b1ff0002, offsetMsgId=0AFCA6FA00002A9F000000000009FD7C, messageQueue=MessageQueue [topic=orderTopic, BrokerName = aarondemacbook-pro. local, queueId=3], queueOffset=46] order id:147 SendResult :SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2010003, offsetMsgId=0AFCA6FA00002A9F000000000009FE62, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=3], QueueOffset = 47] order id: 258 send results: SendResult [sendStatus = SEND_OK, msgId = 7 f0000010a0118b4aac22be4b2020004, offsetMsgId=0AFCA6FA00002A9F000000000009FF48, messageQueue=MessageQueue [topic=orderTopic, BrokerName = Aarondemacbook-pro. local, queueId=2], queueOffset=42] Order ID :258 senddresult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2040005, offsetMsgId=0AFCA6FA00002A9F00000000000A0037, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=2], QueueOffset = 43] order id: 369 send results: SendResult [sendStatus = SEND_OK, msgId = 7 f0000010a0118b4aac22be4b2050006, offsetMsgId=0AFCA6FA00002A9F00000000000A011D, messageQueue=MessageQueue [topic=orderTopic, BrokerName = aarondemacbook-pro. local, queueId=1], queueOffset=63] order id:369 SendResult :SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2060007, offsetMsgId=0AFCA6FA00002A9F00000000000A020C, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=1], QueueOffset = 64] order id: 369 send results: SendResult [sendStatus = SEND_OK, msgId = 7 f0000010a0118b4aac22be4b2070008, offsetMsgId=0AFCA6FA00002A9F00000000000A02F2, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=1], queueOffset=65]Copy the code

Summary: According to the mold taking of different order ids, the messages of different orders are allocated to different MessageQueue, and the same order messages are allocated to the same MessageQueue.

See, all orders with id=147 are sent to Queue3;

Queue2: queue2; queue2: queue2;

All messages with order ID =369 are sent to queue1.

Consumers consume messages sequentially

public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer"); consumer.setNamesrvAddr("localhost:9876"); / / set where to start spending consumer. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("orderTopic", "TagA"); / / make sure a queue is only one thread consumption. Consumer registerMessageListener (new MessageListenerOrderly () {@ Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println("["+Thread.currentThread().getName()+"] "+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; }}); Consumer.start (); System.out.println(" Consumer started "); [ConsumeMessageThread_1] OrderEntity{id=147, name=' Add to cart '} [ConsumeMessageThread_2] OrderEntity{id=258, Name =' add to cart '} [ConsumeMessageThread_3] OrderEntity{id=369, name=' add to cart '} [ConsumeMessageThread_1] OrderEntity{id=147, Name =' order '} [ConsumeMessageThread_3] OrderEntity{id=369, name=' order '} [ConsumeMessageThread_1] OrderEntity{id=147, Name =' payment '} [ConsumeMessageThread_2] OrderEntity{id=258, name=' order '} [ConsumeMessageThread_3] OrderEntity{id=369, Name =' payment '} [ConsumeMessageThread_1] OrderEntity{id=147, name=' done '}Copy the code

Conclusion: Thread 1 consumes messages with id=147, indicating that Queue3 is only sent by one thread.

Comparison and analysis

So if we look at the order of messages for consumers, when we consume, we use MessageListenerOrderly. Is used to tell consumers to consume information sequentially and that only one thread can consume messages individually.

Consumer: ordinary news MessageListenerConcurrently. A message can be consumed concurrently.

Applicable scenario

The theory of flying in the sky, after all, there must be the realization of the ground.

Application scenario: In services, keep the sequence. For example: database binlog message, order creation, order, payment and other messages.

Okay, that’s about it in this video.

If you have any questions, please leave a comment.

Ask a day

And finally, global order, you just set MessageQueue to 1. So how do you set MessageQueue to 1?

Welcome to leave a message

Subsequent articles

  • RocketMQ- Getting Started (updated)
  • RocketMQ- Architecture and Roles (updated)
  • RocketMQ- Message Sending (updated)
  • RocketMQ- Consumption information
  • RocketMQ- Broadcast mode and Cluster Mode for Consumers (updated)
  • RocketMQ- Sequential messages (updated)
  • RocketMQ- Delayed messages
  • RocketMQ- Batch messaging
  • RocketMQ- Filters messages
  • RocketMQ- Transaction messages
  • RocketMQ- Message store
  • RocketMQ – high availability
  • RocketMQ – high performance
  • RocketMQ- Primary/secondary replication
  • RocketMQ- Swiping mechanism
  • RocketMQ – idempotence
  • RocketMQ- Message retry
  • RocketMQ- Dead letter queue

.

Welcome to join (guan) unit (Zhu), the follow-up article dry goods.