This article has participated in the activity of "New person creation Ceremony", and started the road of digging gold creation together.

🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 source address 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣 🍣

1. Release confirmation

1.1. Release confirmation

The persistence of a message requires the following steps:

  1. The setting requires queue persistence.
  2. The setting requires that messages in the queue be persistent.
  3. Release confirmation
  • Without publication confirmation, messages can be lost before being persisted on disk, defeating the purpose of message persistence.

1.2. Publish confirmation policies

1.2.1. Enable the release confirmation method

Channel channel = RabbitmqUtil.getChannel();
// Enable publish confirmation
channel.confirmSelect();
Copy the code
  • Publish confirmation is not enabled by default. If it needs to be enabled, it needs to be calledconfirmSelectThis method is called whenever publication validation needs to be used.

1.2.2. Single release confirmation

  • A single confirmation publication is a simple confirmation mode. It is a synchronous confirmation publication mode. After a message is published, subsequent messages can be published only after it is confirmed to be published.
  • The confirmation mode is mainly throughwaitForConfirmsMethod, which returns only when the message has been acknowledged and throws an exception if the message has not been acknowledged within the specified time range.
  • The biggest drawback of this validation approach is that it is extremely slow to publish.
public static void ConfirmMessageIndividually(a) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // Make a single publication confirmation
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("Message sent successfully"); }}long end = System.currentTimeMillis();

        System.out.println("Single confirmation send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms");
    }
Copy the code

1.2.3 Batch release confirmation

  • Post a batch of messages and confirm them together.
  • Cons: When a glitch causes a problem with the release, we don’t know which message is wrong, and we have to keep the entire batch in memory to record important messages and then republish them.
    public static void ConfirmMessageBatch(a) throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        // The number of messages processed in batches
        int batchSize = 100;

        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // Confirm the batch release
            if(i % batchSize == 0){
                channel.waitForConfirms();
                System.out.println("Batch message processing succeeded"); }}long end = System.currentTimeMillis();

        System.out.println("Batch confirmation send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms");

    }
Copy the code

1.2.4 asynchronously confirm publication

The principle of

  • Asynchronous confirmation publicationIs the use ofThe callback functionTo achieve the reliability of message delivery, this middleware is also through the function callback to ensure the success of delivery.

code

    public static void ConfirmMessageAsync(a) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();


        ConfirmCallback ackCallback = (var1,var2)->{
            System.out.println("Confirmed information." + var1);
        };


        ConfirmCallback nackCallback = (var1,var2)->{
            System.out.println("Unconfirmed message." + var1);
        };


        /** * 1. Callback function that acknowledges the received message * 2. Callback function that does not acknowledge the received message */

        channel.addConfirmListener(ackCallback,nackCallback);

        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        }

        long end = System.currentTimeMillis();

        System.out.println("Asynchronous acknowledgement send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms");
    }
Copy the code

1.2.5. How to handle asynchronous unacknowledged Messages

  • The best solution is to put unacknowledged messages on a memory-based queue that can be accessed by publishing threads, for exampleConcurrentSkipListMap
    public static void ConfirmMessageAsync(a) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();



        /** * 1. Message id * 2
        ConfirmCallback ackCallback = (var1,var2)->{
            if(var2){
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
                longStringConcurrentNavigableMap.clear();
            }else{
                map.remove(var1);
            }
            String message = map.get(var1);
            System.out.println("The confirmed information is:" + message + "Confirmed message tag:" + var1);
        };


        ConfirmCallback nackCallback = (var1,var2)->{
            // Unconfirmed message
            String s = map.get(var1);
            System.out.println(s);
            System.out.println("Unconfirmed message." + var1);
        };


        /** * 1. Callback function that acknowledges the received message * 2. Callback function that does not acknowledge the received message */

        channel.addConfirmListener(ackCallback,nackCallback);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 1. Save the message to a thread-safe queue
            map.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();

        System.out.println("Asynchronous acknowledgement send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms");
    }
Copy the code

1.2.6 Speed comparison of the above three types of release confirmation

  • Single release confirmation: Synchronous wait for confirmation, simple, but very limited throughput.
  • Batch confirmation publishing: Batch synchronization waiting for confirmation, simple, reasonable throughput, once a problem occurs, it is difficult to infer which message is wrong.
  • Asynchronous confirmation publishing: Best performance and resource usage, well controlled in case of errors, but slightly harder to implement.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.xiao.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class ConfirmMessage {

    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        // Single publication confirmation
        ConfirmMessageIndividually(); // It takes 680ms for a single acknowledgment to send 1000 messages

        // Batch release confirmation
        ConfirmMessageBatch(); // It takes 112ms to batch acknowledge 1000 messages

        // Asynchronous publish confirmation
        ConfirmMessageAsync(); // it takes 41ms to send 1000 messages asynchronously
                                // it takes 33ms to send 1000 messages asynchronously
    }

    public static void ConfirmMessageIndividually(a) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // Make a single publication confirmation
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("Message sent successfully"); }}long end = System.currentTimeMillis();

        System.out.println("Single confirmation send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms");
    }

    public static void ConfirmMessageBatch(a) throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        long begin = System.currentTimeMillis();

        // The number of messages processed in batches
        int batchSize = 100;

        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // Confirm the batch release
            if(i % batchSize == 0){
                channel.waitForConfirms();
                System.out.println("Batch message processing succeeded"); }}long end = System.currentTimeMillis();

        System.out.println("Batch confirmation send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms");

    }

    public static void ConfirmMessageAsync(a) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();
        String QUEUE_NAME = UUID.randomUUID().toString();
        channel.queueDeclare(QUEUE_NAME,false.false.false.null);

        channel.confirmSelect();

        ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();



        /** * 1. Message id * 2
        ConfirmCallback ackCallback = (var1,var2)->{
            if(var2){
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
                longStringConcurrentNavigableMap.clear();
            }else{
                map.remove(var1);
            }
            String message = map.get(var1);
            System.out.println("The confirmed information is:" + message + "Confirmed message tag:" + var1);
        };


        ConfirmCallback nackCallback = (var1,var2)->{
            // Unconfirmed message
            String s = map.get(var1);
            System.out.println(s);
            System.out.println("Unconfirmed message." + var1);
        };


        /** * 1. Callback function that acknowledges the received message * 2. Callback function that does not acknowledge the received message */

        channel.addConfirmListener(ackCallback,nackCallback);

        long begin = System.currentTimeMillis();

        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            // 1. Save the message to a thread-safe queue
            map.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();

        System.out.println("Asynchronous acknowledgement send" + MESSAGE_COUNT + "How long does it take to send a message?" + (end - begin) + "ms"); }}Copy the code

2. Switch

  • In this section, we’re going to do something completely different — we’re going to deliver the message to multiple consumers. This model is known as the publish/subscribe model. You need to use switches here.

2.1, Exchanges

2.1.1, concepts,

  • The core idea of the RabbitMQ messaging model is that messages produced by producers are never sent directly to queues.
  • Instead, the producer can only send messages to the switch.

2.1.2, type,

  1. Direct (direct)
  2. Topic
  3. Headers
  4. Fanout

2.1.3 Anonymous Exchange

channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
Copy the code
  • When we send a message,""That meansThe default unnamed queue.
  • Messages can be routed to queues byroutingKeySpecified by binding key.

2.2. Temporary queues

  • Temporary queue: a queue with a random name. Once we disconnect, the queue will be deleted automatically.
String queueName = channel.queueDeclare().getQueue();
Copy the code

2.3 binding

  • The binding is really a bridge between the switch and the queue. It identifies which switch is bound to which queue.

2.4 Fanout (Publish and subscribe model)

Against 2.4.1, introduce

  • It broadcasts all received messages to all queues it knows.

  • The system has a default Fanout switch type

2.4.2, actual combat

  • A switch forwards a message that can be received by all queues to which it is bound.

1. Two consumers

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

public class ReceiveLogs01 {

    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        // Declare a temporary queue
        String queueName = channel.queueDeclare().getQueue();

        /** * 1. Queue name * 2. switch name * 3. RoutingKey */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("Waiting to receive message");


        DeliverCallback deliverCallback = (var1,var2)->{
            System.out.println("The message received by the ReceiveLogs01 console is: + new String(var2.getBody(),"UTF-8"));
        };



        channel.basicConsume(queueName,true,deliverCallback,var1->{}); }}Copy the code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

public class ReceiveLogs02 {

    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        // Declare a temporary queue
        String queueName = channel.queueDeclare().getQueue();

        /** * 1. Queue name * 2. switch name * 3. RoutingKey */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("Waiting to receive message");


        DeliverCallback deliverCallback = (var1,var2)->{
            System.out.println("The message received by the ReceiveLogs02 console is:" + new String(var2.getBody(),"UTF-8"));
        };



        channel.basicConsume(queueName,true,deliverCallback,var1->{}); }}Copy the code

2. A producer

import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.util.Scanner;

public class EmitLog {
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();


        Scanner sc =  new Scanner(System.in);
        while(sc.hasNext()){
            String message = sc.next();
            channel.basicPublish(EXCHANGE_NAME,"".null,message.getBytes("UTF-8"));
            System.out.println("Message sent successfully:"+ message); }}}Copy the code

3. Test results

producers

consumers

  • So in thefanoutIn this mode, all queues can receive messages.

2.5. Direct (Routing Mode)

2.5.1 and introduce

  • Direct exchange sumfanoutThe difference with switches is thatRoutingKeyOf the multiple queues to which it is boundkeyIt’s generally different, but if it’s the same, then it behaves like alpha and betafanoutIt’s kind of similar.

2.5.2, actual combat

The binding between queues and switches

1. The producers

import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.util.Scanner;

public class DirectLogs {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();


        Scanner sc =  new Scanner(System.in);
        while(sc.hasNext()){
            String message = sc.next();
            channel.basicPublish(EXCHANGE_NAME,"error".null,message.getBytes("UTF-8"));
            System.out.println("Message sent successfully:"+ message); }}}Copy the code

2. Two consumers

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

public class ReceiveLogsDirect01 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        // Declare a queue
        channel.queueDeclare("console".false.false.false.null);
        // Bind
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");

        System.out.println("Waiting to receive message");


        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("ReceiveLogsDirect01 console received the following message:" + new String(var2.getBody(),"UTF-8"));
        };

        channel.basicConsume("console".true,deliverCallback,var1->{}); }}Copy the code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

public class ReceiveLogsDirect02 {
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Declare a switch
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        // Declare a queue
        channel.queueDeclare("disk".false.false.false.null);
        // Bind
        channel.queueBind("disk",EXCHANGE_NAME,"error");

        System.out.println("Waiting to receive message");


        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("The message received by the ReceiveLogsDirect02 console is:" + new String(var2.getBody(),"UTF-8"));
        };

        channel.basicConsume("disk".true,deliverCallback,var1->{}); }}Copy the code

3. Test results

  • When a producer sends a message, it takes precedenceRoutingKeyIs then sent to the corresponding queue.

2.6, the Topic

2.6.1, introduce

Previously we used Fanout to send all messages to all queues, direct to send messages to a queue. But let’s say we currently have three queues, and we want to send messages to only two of them, so that requires Topic.

2.6.2 Topic requirements

  • topictheRoutingKeyIt must be a list of words separated by dots.
  • * (asterisk)Can replace a word.
  • # (Hash number)Can replace zero or more words

2.6.3 Matching cases of Topic

  1. quick.orange.rabbit: Received by queues Q1 and Q2
  2. lazy.orange.elephant: Received by queues Q1 and Q2
  3. quick.orange.fox: Received by queue Q1
  4. lazy.brown.fox: is received by queue Q2
  5. lazy.pink.rabbit: satisfies two bindings but is received only once by queue Q2.
  6. quick.brown.fox: Any binding that does not match will not be received by any queue and will be discarded.
  7. quick.orange.male.rabbit: Is four words that do not match any binding will be discarded
  8. lazy.orange.male.rabbit: is four words when matching Q2
  • When a queue binding key is#, so this queue will receive all the data, kind of likefanout.
  • If not in the queue binding key#and*Occurs, then the queue binding type isdirect.

2.6.4, actual combat

1. Two consumers

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

public class ReceiveLogsTopic01 {
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String queueName = "Q1";
        channel.queueDeclare(queueName,false.false.false.null);

        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");

        System.out.println("Waiting to receive message.....");

        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("The message received by the ReceiveLogsTopic01 console is: + new String(var2.getBody(),"UTF-8"));
            System.out.println("Receive queue:" + queueName + "Accepted key:" + var2.getEnvelope().getRoutingKey());
        };

        channel.basicConsume(queueName,true,deliverCallback,var1->{}); }}Copy the code
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xiao.utils.RabbitmqUtil;

public class ReceiveLogsTopic02 {
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitmqUtil.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String queueName = "Q2";
        channel.queueDeclare(queueName,false.false.false.null);

        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");

        System.out.println("Waiting to receive message.....");

        DeliverCallback deliverCallback = (var1, var2)->{
            System.out.println("The message received by the ReceiveLogsTopic02 console is:" + new String(var2.getBody(),"UTF-8"));
            System.out.println("Receive queue:" + queueName + "Accepted key:" + var2.getEnvelope().getRoutingKey());
        };

        channel.basicConsume(queueName,true,deliverCallback,var1->{}); }}Copy the code

2. Producers

import com.rabbitmq.client.Channel;
import com.xiao.utils.RabbitmqUtil;

import java.util.HashMap;
import java.util.Map;

public class EmitLogTopic {
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        Map<String,String> map = new HashMap<>();

        map.put("quick.orange.rabbit"."Received by queue Q1 and Q2");
        map.put("lazy.orange.elephant"."Received by queue Q1 and Q2");
        map.put("quick.orange.fox"."Received by queue Q1");
        map.put("lazy.brown.fox"."Received by queue Q2.");
        map.put("lazy.pink.rabbit"."It satisfies two bindings but is received only once by queue Q2.");
        map.put("quick.brown.fox"."Failure to match any binding will not be received by any queue and will be discarded.");
        map.put("quick.orange.male.rabbit"."Yes four words do not match and any binding will be discarded.");
        map.put("lazy.orange.male.rabbit"."Is four words when matching Q2.");

        for(String key : map.keySet()){
            String message = map.get(key);
            channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8"));
            System.out.println("The message sent is:"+ message); }}}Copy the code

3. Test results

1. The producers

2. Consumers