This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

RocketMQ batch messages

Batch message

Sending messages in batches can significantly improve the performance of delivering small messages. The limitations are that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. Also, the total size of this batch of messages should not exceed 4MB.

Sending batch messages

If you only send messages of up to 4MB each time, it is easy to use batch processing, as shown in the following example:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   / / handle the error
}
Copy the code

Message list splitting

The complexity only grows when you send large quantities, and you may not be sure if it exceeds the size limit (4MB). At this point you’d better split up your message list:

package io.mvvm.batch;

import org.apache.rocketmq.common.message.Message;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/** * Message list split */
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext(a) {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next(a) {
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    private int getStartIndex(a) {
        Message currMessage = messages.get(currIndex);
        int tmpSize = calcMessageSize(currMessage);
        while (tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(currIndex);
            tmpSize = calcMessageSize(message);
        }
        return currIndex;
    }

    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length;
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
        }
        tmpSize = tmpSize + 20; // Increase the daily log overhead by 20 bytes
        returntmpSize; }}Copy the code

Send segmented batch messages

package io.mvvm.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

/ * * *@program: spring-cloud-alibaba
 * @description: bulk message provider *@authorPan: *@create: the 2020-12-27 22:32 * * /
public class Producer {

    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
        // Break a large message into several smaller messages
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            try {
                List<Message> listItem = splitter.next();
                SendResult send = producer.send(listItem);
                System.out.println(send);
            } catch (Exception e) {
                e.printStackTrace();
                / / handle the error}}}}Copy the code

Bulk message consumption

package io.mvvm.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
/ * * *@program: spring-cloud-alibaba
 * @description: Bulk message consumers *@authorPan: *@create: the 2020-12-27 22:33 * * /
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /** * sets whether the first start of Consumer is from the head of the queue or from the end of the queue
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("BatchTest"."TagA");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // We can see that each queue has a unique consume thread, and orders are ordered for each queue(partition)
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
                returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("Consumer Started."); System.in.read(); }}Copy the code

RocketMQ sequential messages

The order message

Message ordering refers to the ability to consume a class of messages in the order in which they are sent. For example, an order produces three messages: order creation, order payment, and order completion. Consumption must be in this order in order to make sense, but at the same time orders can be consumed in parallel. RocketMQ ensures strict message ordering.

Sequential messages are divided into global sequential messages and partitioned sequential messages. Global sequential messages mean that all messages under a certain Topic must be in order. Partial sequential messages only need to ensure that each group of messages is consumed sequentially.

  • Global ordering For a given Topic, all messages are published and consumed in a strict first-in, first-out (FIFO) order. Application scenario: A scenario with low performance requirements and in which all messages are published and consumed strictly in accordance with the FIFO principle
  • Partitioning Order For a given Topic, all messages are partitioned according to a Sharding key. Messages within the same partition are published and consumed in a strict FIFO order. Sharding key is a key field used to distinguish different partitions in sequential messages, which is completely different from the key of ordinary messages. Application scenario: High performance requirements, sharding key is used as the partitioning field, and messages are distributed and consumed in the same block in strict accordance with the FIFO principle.

Sequential message production

package io.mvvm.list;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/ * * *@program: spring-cloud-alibaba
 * @description: Producer sends sequential messages *@authorPan: *@create: the 2020-12-27 22:01 * * /
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.start();

        String[] tags = new String[]{"TagA"."TagC"."TagD"};

        // Order list
        List<OrderStep> orderList = new Producer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < 10; i++) {
            // Add a time prefix
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("ListTopic", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  // Select send queue according to order ID
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());/ / order id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    /** ** Order steps */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId(a) {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc(a) {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString(a) {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + ' ''+'}'; Private List
      
        buildOrders() {List
       
         orderList = new ArrayList
        
         (); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); OrderDemo. SetDesc (" create "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); OrderDemo. SetDesc (" create "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); OrderDemo. SetDesc (" payment "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); OrderDemo. SetDesc (" create "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); OrderDemo. SetDesc (" payment "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); OrderDemo. SetDesc (" payment "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); OrderDemo. SetDesc (" finished "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); OrderDemo. SetDesc (" push "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); OrderDemo. SetDesc (" finished "); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); OrderDemo. SetDesc (" finished "); orderList.add(orderDemo); return orderList; }}
        
       
      Copy the code

Sequential consumption message

package io.mvvm.list;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/ * * *@program: spring-cloud-alibaba
 * @description: sequential message consumption with transaction mode (the application can control when the Offset is committed) *@authorPan: *@create: the 2020-12-27 22:06 * * /
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /** * sets whether the first start of Consumer is from the head of the queue or from the end of the queue
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("ListTopic"."TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // We can see that each queue has a unique consume thread, and orders are ordered for each queue(partition)
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                try {
                    // The business logic processing...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("Consumer Started."); System.in.read(); }}Copy the code

RocketMQ Timing message

Timing of the message

Timed messages (deferred queues) are messages that are sent to the broker but are not consumed immediately and wait for a specific time to be delivered to the real Topic. The default value is 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 10m, 20m, 30m, 1H, 2h, and 18 levels. You can configure a custom messageDelayLevel. Note that messageDelayLevel is a property of the broker and does not belong to a topic. To send messages, set the delayLevel: msg.setdelaylevel (level). Level has the following three situations:

  • If level == 0, the message is non-delayed
  • 1<=level<=maxLevel, the message is delayed for a specific time, for example, level==1, the delay is 1s
  • If level > maxLevel, level== maxLevel, for example, level==20, the delay is 2h

The scheduled messages are temporarily stored in a topic named SCHEDULE_TOPIC_XXXX and stored in a queue according to delayTimeLevel. QueueId = delayTimeLevel -1, that is, a queue only stores messages with the same delay. Ensure that messages with the same delivery delay can be consumed sequentially. The broker consumes SCHEDULE_TOPIC_XXXX schedulingly, writing the message to the actual topic.

Note that timed messages are counted on the first write and scheduled writes to real topics, so the number of sent messages and TPS will be higher.

Usage scenarios

For example, in e-commerce, if you submit an order, you can send a delay message, check the status of the order one hour later, and cancel the order to release the inventory if the payment is still not made.

Restrictions on the use of delayed messages

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

Currently, RocketMq does not support arbitrary latency. There are several fixed latency levels, ranging from 1s to 2h, respectively, with levels 1 to 18. Failure to consume a message will enter a delayed message queue. See the code SendMessageProcessor. Java

Start consumers waiting for incoming subscription messages

package io.mvvm.scheduled;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/ * * *@program: spring-cloud-alibaba
 * @description: Timed message consumer *@authorPan: *@create: "* * / 2020-12-27
public class Consumer {
    public static void main(String[] args) throws Exception {
        // Instantiate the consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        / / subscribe switchable viewer
        consumer.subscribe("ScheduledTopic"."*");
        // Register message listeners
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "]" + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumerconsumer.start(); System.in.read(); }}Copy the code

Sending delayed Messages

package io.mvvm.scheduled;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

/ * * *@program: spring-cloud-alibaba
 * @description: timed message provider *@authorPan: *@create: "* * / 2020-12-27
public class Producer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to generate a delayed message
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // Start the producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // Set the delay level to 3, the message will be sent after 10 seconds (now only supports a fixed number of times, see delayTimeLevel)
            message.setDelayTimeLevel(3);
            // Send a message
            producer.send(message);
        }
        // Close the producerproducer.shutdown(); }}Copy the code

RocketMQ transaction messages

Transaction message

The RocketMQ Transactional Message means that applying a local transaction and sending a Message operations can be defined into a global transaction that either succeeds or fails at the same time. RocketMQ’s transaction messages provide distributed transaction functionality similar to X/Open XA, through which the ultimate consistency of distributed transactions can be achieved.

Transaction messages have three states: commit state, rollback state, and intermediate state:

  • TransactionStatus.Com mitTransaction: submit a transaction, it allows consumer spending this message.
  • TransactionStatus. RollbackTransaction: roll back a transaction, it represents the message will be deleted, is not permitted to consume.
  • TransactionStatus. Unknown: intermediate state, it means we need to check the message queue to determine the state.

Sending transaction messages

Using the TransactionMQProducer class to create producers and specify a unique ProducerGroup, you can set up a custom thread pool to handle these check requests. After a local transaction is executed, a reply needs to be made to the message queue based on the execution result. Refer to the previous section for the returned transaction state.

package io.mvvm.transaction;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

/ * * *@program: spring-cloud-alibaba
 * @description: Sends transaction messages *@authorPan: *@create: "* * / 2020-12-31
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2.5.100,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                returnthread; }}); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags =new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000); } producer.shutdown(); }}Copy the code

Implement a transaction listening interface

When the half-message is successfully sent, we use the executeLocalTransaction method to execute the local transaction. It returns one of the three transaction states mentioned in the previous section. The checkLocalTransaction method is used to check the status of local transactions and respond to check requests from message queues. It also returns one of the three transaction states mentioned in the previous section.

package io.mvvm.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionListenerImpl implements TransactionListener {
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      int value = transactionIndex.getAndIncrement();
      int status = value % 3;
      localTrans.put(msg.getTransactionId(), status);
      return LocalTransactionState.UNKNOW;
  }
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      Integer status = localTrans.get(msg.getTransactionId());
      if (null! = status) {switch (status) {
              case 0:
                  return LocalTransactionState.UNKNOW;
              case 1:
                  return LocalTransactionState.COMMIT_MESSAGE;
              case 2:
                  returnLocalTransactionState.ROLLBACK_MESSAGE; }}returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

Limitations on transaction message usage

  1. Transaction messages do not support delayed and batch messages.
  2. To avoid half-queue message accumulation due to a single message being checked too many times, we limit the number of checks for a single message to 15 by default, but users can configure the file through the BrokertransactionCheckMaxParameter to modify this limit. If a message has been checked more than N times (N =transactionCheckMax) then the Broker discards the message and prints an error log along with it by default. The user can override itAbstractTransactionalMessageCheckListenerClass to modify this behavior.
  3. Transaction messages are checked after a specified length of time such as the transactionTimeout parameter in the Broker configuration file. Users can also change this limit by setting the user property CHECK_IMMUNITY_TIME_IN_SECONDS when sending transaction messages, which takes precedence overtransactionTimeoutParameters.
  4. Transactional messages may be checked or consumed more than once.
  5. The target topic message submitted to the user may fail, currently depending on logging. Its high availability is guaranteed by RocketMQ’s own high availability mechanism, and if you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, a synchronous dual write mechanism is recommended.
  6. The producer ID of a transaction message cannot be shared with the producer ID of another type of message. Unlike other types of messages, transaction messages allow reverse lookup, and MQ servers can query to consumers through their producer IDS.