This is the 23rd day of my participation in the August More Text Challenge.More challenges in August

🌈 Past review

Thank you for reading, I hope to help you, if there is a flaw in the blog, please leave a message in the comment area or add my private chat in the home page, thank you for every little partner generous advice. I’m XiaoLin, a boy who can both write bugs and sing rap

  • πŸ’–10 minutes to RocketMQ! You can’t even get into Ali? 1 ️ ⃣ πŸ’–
  • πŸ’– 5-minute introduction to custom images and installation software πŸ’–Docker series
  • πŸ’–5 minutes introduction to Docker core components πŸ’–Docker series introductory tutorial

Three, entry use

3.1. Import dependencies

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.4.0</version>
    </dependency>
Copy the code

3.2. Send messages

Step 1 for sending a message:

  1. Creates a message producer, specifying the group name to which the producer belongs.
  2. Specify the Nameserver address.
  3. Start the producer.
  4. Creates a message object, specifying the subject, label, and message body.
  5. Send a message.
  6. Close the producer.
public class Producer {
  public static void main(String[] args) throws Exception {
  // Create a producer object and specify a producer group
  DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
  // Specify the name server address
    producer.setNamesrvAddr("127.0.0.1:9876");
  // Start the producer
    producer.start();
    // Create a message with the parameters subject, tag, and message body
    Message message = new Message("hello_demo1"."tag1"."Hello, this is News one.".getBytes("utf-8"));
    // Send a message
    producer.send(message);
    // Close the resourceproducer.shutdown(); }}Copy the code

3.3 consume messages (receive messages)

Steps for consuming messages:

1. Create a message consumer and specify the group name to which the consumer belongs. 2. Specify topics and labels to which consumers subscribe 4. Set up callback functions and write methods to handle messages 5. Launch message consumer

public class ConsumerDemo {

  public static void main(String[] args) throws MQClientException {
    // Create a pull message object and specify the group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-demo");
      // Specify the name server address
    producer.setNamesrvAddr("127.0.0.1:9876");
      // Specify the subject and tag to which the consumer subscribes. The second parameter is used to filter the criteria to specify what tag to receive
      consumer.subscribe("hello_demo1"."*");
    // Register a message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext)
        {
        // There may be multiple messages
        for (MessageExt msg : msgs) {
          // Prints the message
          System.out.println(new String(msg.getBody()));
        }
        // The message is sent successfully
        returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumerconsumer.start(); }}Copy the code

Type of message

4.1. Ordinary messages

RocketMQ provides three ways to send normal messages: reliable synchronous, reliable asynchronous, and one-way.

4.1.1. Reliable synchronous transmission

Synchronous sending refers to the communication mode in which the sender sends data and sends the next data packet after receiving the response from the receiver. This method is widely used in a wide range of scenarios, such as important notification email, registration SMS notification, marketing SMS system, etc.

public class SyncProducer {

	public static void main(String[] args) throws Exception {
    	// Instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
    	// Set the address of NameServer
		producer.setNamesrvAddr("127.0.0.1:9876");
    	// Start the Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // Create a message and specify Topic, Tag, and message body
    	    Message msg = new Message("04-producer-type" /* Topic */."TagA" /* Tag */,
        	("This is a synchronous message." + i).getBytes("utf-8") /* Message body */);
        	// Send synchronous messages to a Broker
            SendResult sendResult = producer.send(msg);
            // Whether the message was successfully delivered via sendResult
            System.out.println(JSON.toJSONString(sendResult));
    	}
    	// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

4.1.2 Asynchronous messaging

Asynchronous sending refers to the communication mode in which the sender sends data and then sends the next data packet without waiting for the response from the receiver. The sender receives the server response through the callback interface and processes the response results.

Asynchronous transmission is generally applicable to service scenarios that require long links and are sensitive to RT response time. For example, after a user uploads a video, the transcoding service is notified and the transcoding result is pushed.

public class ASyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
    	// Set the address of NameServer
		producer.setNamesrvAddr("127.0.0.1:9876");
    	// Start the Producer instance
        producer.start();
		for (int i = 0; i < 100; i++) {
    	    // Create a message and specify Topic, Tag, and message body
    	    Message msg = new Message("04-producer-type" /* Topic */."TagA" /* Tag */,
        	("I'm an asynchronous message" + i).getBytes("utf-8") /* Message body */
        	);
        	// Send synchronous messages to a Broker
            producer.send(msg, new SendCallback() {
				@Override
				public void onSuccess(SendResult sendResult) {
					System.out.println("Message sent successfully");
					System.out.println(JSON.toJSONString(sendResult));
				}
				@Override
				public void onException(Throwable e) {
					System.out.println("Message sending failed"+e.getMessage());
					System.out.println("Processing failed message"); }}); }// Let the thread not terminate, otherwise an error will be reported
		Thread.sleep(30000000);
    	// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

4.1.3 One-way messages

One-way send means that the sender only sends messages without waiting for the server to respond and no callback function is triggered. That is, the sender only sends requests without waiting for responses. This method is applicable to scenarios that require very short time but do not have high reliability requirements, for example, log collection.

public class OneWayProducer {

    public static void main(String[] args) throws Exception {
        // Instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
        // Set the address of NameServer
        producer.setNamesrvAddr("127.0.0.1:9876");
        // Start the Producer instance
        producer.start();
        for (int i = 0; i < 100; i++) {
            // Create a message and specify Topic, Tag, and message body
            Message msg = new Message("04-producer-type" /* Topic */."TagA" /* Tag */,
                    ("I'm a one-way message" + i).getBytes("utf-8") /* Message body */
            );
            // Send one-way messages to a Broker
            producer.sendOneway(msg);
        }
        // If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

4.1.4 Comparison of the three sending modes

Way to send Time of delivery Sending feedback results Whether data is lost
The synchronous fast There are Don’t lose
Asynchronous send fast There are Don’t lose
A one-way message faster There is no May be lost

4.2. Sequential messages

Although RocketMQ’s data structure is a queue, it seems to support sequential messages by nature. When there is only one queue, it naturally supports sequential messages. However, Brocket has multiple queues inside. Consumers consume messages in a multithreaded manner, so there is no guarantee that they will be consumed in the same way as they are sent. The solution is to send all messages to a queue.

For example, the process of an order is: create, pay, push, complete. The order number is the same

A sequential message is a type of message that is published and consumed strictly sequentially provided by a message queue.

/** * order builder */
public class OrderStep {
    private long orderId;
    private String desc;

    public long getOrderId(a) {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc(a) {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString(a) {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\' ' +
                '} ';
    }

    public static List<OrderStep> buildOrders(a) {
        // 1039L: Create payment push completed
        // 1065L: Create payment
        // 7235L: Create payment
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("Create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("Create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("Create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("Push");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("Complete");
        orderList.add(orderDemo);
        returnorderList; }}Copy the code
public class Producer {

    public static void main(String[] args) throws Exception {
        //1. Create the message producer producer and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        / / 3. Start the producer
        producer.start();
        // Build the message set
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        // Send a message
        for (int i = 0; i < orderSteps.size(); i++) {
            String body = orderSteps.get(i) + "";
            Message message = new Message("OrderTopic"."Order"."i" + i, body.getBytes());
            /** * Parameter one: message object * Parameter two: message queue selector * Parameter three: select queue business ID (order ID) */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                / * * * *@paramMQS: collection of queues *@paramMSG: message object *@paramArg: service identifier parameter *@return* /
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    long orderId = (long) arg;
                    long index = orderId % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderSteps.get(i).getOrderId());

            System.out.println("Send result:"+ sendResult); } producer.shutdown(); }}Copy the code
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1. Create a Consumer group named Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Specify the Nameserver address
        consumer.setNamesrvAddr("192.168.25.135:9876; 192.168.25.138:9876");
        //3. Subscribe to Topic and Tag
        consumer.subscribe("OrderTopic"."*");

        //4. Register message listeners
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Thread name: [" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));
                }
                returnConsumeOrderlyStatus.SUCCESS; }});//5. Start consumers
        consumer.start();

        System.out.println("Consumer start"); }}Copy the code

4.3 transaction messages

RocketMQ provides transaction messages, through which the ultimate consistency of distributed transactions can be achieved. The process of transactional message interaction is as follows:

Basic concepts of transaction messages:

  • Semi-transactional message: A message that is temporarily undeliverable. The sender has successfully sent the message to the RocketMQ server, but the server has not received a second acknowledgement of the message from the producer. The message is marked as “temporarily undeliverable”.
  • Message back check: If the RocketMQ server detects that a transaction message has been in the “semi-transaction message” for a long time, it proactively asks the message producer for the final status (Commit or Rollback) of the message. The query process is called message Rollback.

Transaction message sending steps:

  1. The sender sends a semi-transaction message to the RocketMQ server.
  2. After persisting the message, the RocketMQ server returns an acknowledgement to the sender that the message has been successfully sent, which is a semi-transactional message.
  3. The sender starts executing the local transaction logic.
  4. The sender submits a secondary acknowledgement (Commit or Rollback) to the server based on the result of the local transaction. When the server receives the Commit status, the transaction message is marked as deliverable and the subscriber finally receives the message. The server deletes the semi-transaction message if it receives a Rollback state, and the subscriber will not receive the message.

Transaction message lookup steps:

  1. In the case of network disconnection or application restart, the secondary confirmation submitted in Step 4 does not reach the server. After a fixed period of time, the server checks the message back.
  2. After receiving the message, the sender needs to check the final result of the local transaction execution of the corresponding message.
  3. The sender submits a second acknowledgement based on the final status of the local transaction, and the server continues to perform operations on the half-transaction message according to Step 4.

4.4 Delay message

For example, in e-commerce, if you submit an order, you can send a delay message, check the status of the order one hour later, and cancel the order to release the inventory if the payment is still not made. We can use delayed messages to do this.

Limitations on the use of delayed messages. Currently RocketMq does not support arbitrary latency, requiring several fixed latency levels ranging from 1s to 2h, corresponding to levels 1 to 18.

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

If a message fails to be consumed, it enters the delayed message queue. The message sending time is related to the set delay level and retry times.

4.4.1 Producers

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to generate a delayed message
        DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // Start the producer
        producer.start();
        Message message = new Message("06-delay", ("delay message").getBytes());
        // Set the delay level to 3, the message will be sent after 10 seconds (now only supports a fixed number of times, see delayTimeLevel)
        message.setDelayTimeLevel(3);
        // Send a message
        producer.send(message);
        // Close the producerproducer.shutdown(); }}Copy the code

4.4.2 Consumers

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode_consumer");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       / / subscribe switchable viewer
       consumer.subscribe("06-delay"."*");
      // Register message listeners
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "]" );
              }
              returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumerconsumer.start(); }}Copy the code

4.5 Message filtering

When consuming messages, we can specify which messages to consume. At this time, we need to use message filtering, which can be divided into 1 and 2 filters:

  1. Filter by label.
  2. Filter by SQL statement.

4.5.1 Filter by label

4.5.1.1 Producers

public class Producer {

    public static void main(String[] args) throws Exception {
        //1. Create the message producer producer and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        / / 3. Start the producer
        producer.start();
        for (int i = 0; i < 3; i++) {
            //4. Create a message object and specify the Topic, Tag, and message body
            /** * Parameter 1: message Topic * Parameter 2: message Tag * parameter 3: message content */
            Message msg = new Message("FilterTagTopic"."Tag2", ("Filtering of messages" + i).getBytes());
            //5. Send messages
            SendResult result = producer.send(msg);
            // Send status
            SendStatus status = result.getSendStatus();
            System.out.println("Send result :" + result);
            // The thread sleeps for 1 second
            TimeUnit.SECONDS.sleep(1);
        }
       //6. Close producersproducer.shutdown(); }}Copy the code

4.5.1.2 Consumers

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1. Create a Consumer group named Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Specify the Nameserver address
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3. Subscribe to Topic and Tag
        consumer.subscribe("FilterTagTopic"."Tag1 || Tag2 ");

        //4. Set the callback function to process the message
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            // Accept the message content
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});//5. Start consumer
        consumer.start();
        System.out.println("Consumer start"); }}Copy the code

4.5.2. Filtering through SQL statements

RocketMQ defines only some basic syntax to support this feature. You can also easily extend it.

  • Numerical comparison, for example: >, >=, <, <=, BETWEEN, =
  • Character comparisons, such as: =, <>, IN
  • IS NULL ζˆ–θ€… IS NOT NULL
  • Logical symbols AND, OR, NOT

Constants are of the following supported types:

  • A value, such as 123,3.1415
  • Characters, such as’ ABC ‘, must be enclosed in single quotes
  • NULL, a special constant
  • Boolean value, TRUE or FALSE

4.5.2.1 Message producers

When you send a message, you can set the properties of the message with putUserProperty.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();
Copy the code

4.5.2.2 Message consumers

Use Messageselector.bysQL to filter messages using SQL

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// Only subscribed messages have this attribute a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code