Hd has synchronous Git:https://github.com/SoWhat1412/xmindfile mind maps, focus on the public sowhat1412 access to huge amounts of resources

Insert a picture description here

1. What problem does message queuing solve

Messaging middleware is a popular middleware, of which RabbitMQ has a market share, mainly used for asynchronous processing, application decoupling, traffic peak cutting, log processing and so on.

1. Asynchronous processing

A user login website registration, and then the system sends SMS and email to inform the registration success, generally there are three solutions.

  1. Serial to sequential execution, the problem is that users can use after registration, there is no need to wait for verification code and email.
  2. After successful registration, the email and verification code are executed in parallel. The problem is that the email and verification code are non-important tasks. Should we wait for them to complete system registration?
  3. Based on the processing of asynchronous MQ, the information is directly sent asynchronously to MQ after the user registers successfully, and then the mail system and the verification code system take the initiative to pull data.

2. Apply decoupling

For example, we have an order system, but also an inventory system, the user placed an order will call the inventory system to deal with, directly call the inventory system problems how to do?

3. Flow peak cutting

Hold aSeconds kill activityHow to design better? The service layer directly receiving instant access must never join at least one MQ.

4. Log processing

How does the backend accept and process requests sent by users through the WebUI?

2. Install and configure RabbitMQ

Website:www.rabbitmq.com/download.ht…Development language:www.erlang.org/Erlang and RabbitMQ versions are required to be compatible with each other. I’m going to use Docker just to make it easyPull the mirror. Download:Enable: Management page Default account: guest Default password: guest. When Docker is started, you can specify the account password and external port

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management 
Copy the code

Activation:User add:Vitrual hosts is equivalent to the DB in mysql. Create a virtual hosts, usually starting with a slash.To authorize the user, click/vhost_mMR,As for the WebUI, you can understand more.

3. The actual combat

The RabbitMQ website support task mode: www.rabbitmq.com/getstarted…. Create Maven project import necessary dependencies:

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.2</version>
 </dependency>   <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-api</artifactId>  <version>1.7.10</version>  </dependency>   <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId>  <version>1.7.5</version>  </dependency>   <dependency>  <groupId>log4j</groupId>  <artifactId>log4j</artifactId>  <version>1.2.17</version>  </dependency>   <dependency>  <groupId>junit</groupId>  <artifactId>junit</artifactId>  <version>4.11</version>  </dependency>  </dependencies> Copy the code

0. Obtain the MQ connection

package com.sowhat.mq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException; import java.util.concurrent.TimeoutException;  public class ConnectionUtils {  / * ** the connector * @return  * @throws IOException  * @throws TimeoutException * /  public static Connection getConnection(a) throws IOException, TimeoutException {  ConnectionFactory factory = new ConnectionFactory();  factory.setHost("127.0.0.1");  factory.setPort(5672);  factory.setVirtualHost("/vhost_mmr");  factory.setUsername("user_mmr");  factory.setPassword("sowhat");  Connection connection = factory.newConnection();  return connection;  } } Copy the code

1. Simple queues

P:Producer Indicates the message producers. Intermediate: Queue indicates the message Queue. C:Consumer indicates the message consumers

package com.sowhat.mq.simple;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send {  public static final String QUEUE_NAME = "test_simple_queue";   public static void main(String[] args) throws IOException, TimeoutException {  // Get a connection  Connection connection = ConnectionUtils.getConnection();  // Get a channel from the connection  Channel channel = connection.createChannel();  // Create a queue declaration  AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false.false.false.null);   String msg = "hello Simple";  // Exchange, queue, parameter, message byte body  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());   System.out.println("--send msg:" + msg);   channel.close();   connection.close();   } } --- package com.sowhat.mq.simple;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  / * ** Consumers get information* / public class Recv {  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  newApi();  oldApi();  }   private static void newApi(a) throws IOException, TimeoutException {  // Create a connection  Connection connection = ConnectionUtils.getConnection();  // Create channel  Channel channel = connection.createChannel();  // The queue declares the queue name, whether it is persistent, whether it is in exclusive mode, whether it is automatically deleted when there is no message, and the message carries parameters  channel.queueDeclare(Send.QUEUE_NAME,false.false.false.null);  // Define the consumer  DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {  @Override // Event model, which is triggered by incoming messages  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("---new api recv:" + s);  }  };  // Listen to the queue  channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);  }   // Old method consumer MQ in 3. 4. The following methods are used:  private static void oldApi(a) throws IOException, TimeoutException, InterruptedException {  // Create a connection  Connection connection = ConnectionUtils.getConnection();  // Create channel  Channel channel = connection.createChannel();  // Define queue consumers  QueueingConsumer consumer = new QueueingConsumer(channel);  // Listen to the queue  channel.basicConsume(Send.QUEUE_NAME, true, consumer);  while (true) {  / / delivery  QueueingConsumer.Delivery delivery = consumer.nextDelivery();  byte[] body = delivery.getBody();  String s = new String(body);  System.out.println("---Recv:" + s);  }  } } Copy the code

Page refresh frequency can be set in the upper right corner, and then can be directly consumed manually in the UI, as shown below: The shortcomings of simple queues: The coupling is too high, producer to consumer, if multiple consumers want to consume the information in the queue can not be achieved.

2. WorkQueue indicates the WorkQueue

In the Simple queue, there are only one-to-one production and consumption. In the actual development, it is very Simple for producers to send messages, while consumers need to combine with businesses, and it takes time for consumers to process the messages after receiving them.There may be a backlog of messages in the queue. So if multiple consumers can accelerate consumption.

1. Round Robin Distribution

Code to program one producer and two consumers:

package com.sowhat.mq.work;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send {  public static final String QUEUE_NAME = "test_work_queue";  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  // Get the connection  Connection connection = ConnectionUtils.getConnection();  / / for the channel  Channel channel = connection.createChannel();  // Declare a queue  AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false.false.false.null);  for (int i = 0; i <50 ; i++) {  String msg = "hello-" + i;  System.out.println("WQ send " + msg);  channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());  Thread.sleep(i*20);  }  channel.close();  connection.close();  } }  --- package com.sowhat.mq.work;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv1 {  public static void main(String[] args) throws IOException, TimeoutException {  // Get the connection  Connection connection = ConnectionUtils.getConnection();  // Get the channel  Channel channel = connection.createChannel();  // Declare a queue  channel.queueDeclare(Send.QUEUE_NAME, false.false.false.null);  // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[1] : + s);  try {  Thread.sleep(2000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println("[1] done);  }  }  };  boolean autoAck = true;  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);  } } --- package com.sowhat.mq.work;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv2 {  public static void main(String[] args) throws IOException, TimeoutException {  // Get the connection  Connection connection = ConnectionUtils.getConnection();  // Get the channel  Channel channel = connection.createChannel();  // Declare a queue  channel.queueDeclare(Send.QUEUE_NAME, false.false.false.null);  // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[2] : + s);  try {  Thread.sleep(1000 );  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println(【2】 done);  }  }  };  boolean autoAck = true;  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);  } } Copy the code

Phenomenon: Consumer 1 processes exactly the same amount of data as consumer 2: Consumer 1: processes even numbers consumer 2: processes odd numbersRound-robin distributionThe result is that no matter which of the two consumers is busy,The data is always yours and mineMQ does not know consumer performance when sending data to two consumers, and the default is rain and rain. AutoAck = true.

2. Distribute fair Dipatches fairly

To achieve fair distribution, you need to let MQ know when the consumer finishes consuming a piece of data, and then let MQ send the data. Auto answer to turn off!

package com.sowhat.mq.work;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send {  public static final String QUEUE_NAME = "test_work_queue";  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  // Get the connection  Connection connection = ConnectionUtils.getConnection();  / / for the channel  Channel channel = connection.createChannel();  // s declares a queue  AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false.false.false.null);   Before each consumer sends a confirmation message, the message queue does not send the next message to the consumer, but only one message at a time  // To limit the number of messages sent to consumers to one at a time.  int perfetchCount = 1;  channel.basicQos(perfetchCount);   for (int i = 0; i <50 ; i++) {  String msg = "hello-" + i;  System.out.println("WQ send " + msg);  channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());  Thread.sleep(i*20);  }  channel.close();  connection.close();  } } --- package com.sowhat.mq.work;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv1 {  public static void main(String[] args) throws IOException, TimeoutException {  // Get the connection  Connection connection = ConnectionUtils.getConnection();  // Get the channel  final Channel channel = connection.createChannel();  // Declare a queue  channel.queueDeclare(Send.QUEUE_NAME, false.false.false.null);  Ensure that only one is distributed at a time  channel.basicQos(1);  // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[1] : + s);  try {  Thread.sleep(2000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println("[1] done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(),false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);  } } --- package com.sowhat.mq.work;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv2 {  public static void main(String[] args) throws IOException, TimeoutException {  // Get the connection  Connection connection = ConnectionUtils.getConnection();  // Get the channel  final Channel channel = connection.createChannel();  // Declare a queue  channel.queueDeclare(Send.QUEUE_NAME, false.false.false.null);  Ensure that only one is distributed at a time  channel.basicQos(1);  // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[2] : + s);  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println(【2】 done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(),false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);  } } Copy the code

Result: Fair distribution is achieved, with consumer 2 consuming twice as much as consumer 1.

3. Publish /subscribe

RoutingKey is not required for subscribing and publishing similar to public accounts:

Reading:

  1. One producer has many consumers
  2. Each consumer has its own queue
  3. Instead of sending the message directly to the queue, the producer sends the message toExchange Converter.
  4. Each queue is bound to the switch.
  5. Messages sent by producers pass through the switch to the queue, enabling one message to be consumed by multiple consumers.

Producers:

package com.sowhat.mq.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
 import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send {  public static final String EXCHANGE_NAME = "test_exchange_fanout";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();   // Declare a switch  channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// distribute = fanout   // Send a message  String msg = "hello ps ";   channel.basicPublish(EXCHANGE_NAME,"".null,msg.getBytes());  System.out.println("Send:" + msg);   channel.close();  connection.close();  } } Copy the code
Insert a picture description here

Where’s the news? Lost. In RabbitMQ only queues have storage capacity and messages are lost because queues are not yet bound to the switch. Consumer:

package com.sowhat.mq.ps;

import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;

import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv1 {  public static final String QUEUE_NAME = "test_queue_fanout_email";  public static final String EXCHANGE_NAME = "test_exchange_fanout";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  final Channel channel = connection.createChannel();  // Queue declaration  channel.queueDeclare(QUEUE_NAME,false.false.false.null);  // Bind queues to switch forwarders  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );   Ensure that only one is distributed at a time  channel.basicQos(1);  // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[1] : + s);  try {  Thread.sleep(2000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println("[1] done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(),false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(QUEUE_NAME, autoAck, consumer);  } } --- package com.sowhat.mq.ps;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv2 {  public static final String QUEUE_NAME = "test_queue_fanout_sms";  public static final String EXCHANGE_NAME = "test_exchange_fanout";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  final Channel channel = connection.createChannel();  // Queue declaration  channel.queueDeclare(QUEUE_NAME,false.false.false.null);  // Bind queues to switch forwarders  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );  Ensure that only one is distributed at a time  channel.basicQos(1);  // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[2] : + s);  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println(【2】 done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(),false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(QUEUE_NAME, autoAck, consumer);  } } Copy the code

You can also manually add a queue to monitor the Exchange

4. Routing Select the wildcard mode

Exchange:One side receives the producer message and the other side pushes the message to the queue. Anonymous forwarding is denoted by “”, such as before to simple queue and WorkQueue.fanout: Does not process routing keys.You do not need to specify a routingKey, we just bind the queue to the switch,The message will be sent to all the queues.direct: Handles routing keys,A routingKey needs to be specifiedIn this case, the producer will specify the key when sending data, and the task queue will also specify the key. Only the key can be sent to the queue. The following figure

package com.sowhat.mq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
 import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send {  public static final String EXCHANGE_NAME = "test_exchange_direct";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();  // exchange  channel.exchangeDeclare(EXCHANGE_NAME,"direct");   String msg = "hello info!";   // The type can be specified  String routingKey = "info";  channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());  System.out.println("Send : " + msg);  channel.close();  connection.close();  } } --- package com.sowhat.mq.routing;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv1 {  public static final String EXCHANGE_NAME = "test_exchange_direct";  public static final String QUEUE_NAME = "test_queue_direct_1";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  final Channel channel = connection.createChannel();   channel.queueDeclare(QUEUE_NAME,false.false.false.null);  channel.basicQos(1);   channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");   // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[1] : + s);  try {  Thread.sleep(2000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println("[1] done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(),false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(QUEUE_NAME, autoAck, consumer);  } } --- package com.sowhat.mq.routing;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv2 {  public static final String EXCHANGE_NAME = "test_exchange_direct";  public static final String QUEUE_NAME = "test_queue_direct_2";   public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  final Channel channel = connection.createChannel();   channel.queueDeclare(QUEUE_NAME, false.false.false.null);  channel.basicQos(1);   // The binding type is similar to Key  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");   // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[2] : + s);  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println(【2】 done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(), false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(QUEUE_NAME, autoAck, consumer);   } } Copy the code

WebUI: disadvantages: The route key must be clear. Fuzzy matching cannot be implemented.

5. Switchable viewer theme

To match a routing key to a pattern, # means match >=1 character, and * means match one character. The producer takes a routingKey, but the consumer’s MQ takes a fuzzy routingKey.Product: publish, delete, modify, query.

package com.sowhat.mq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
 import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send {  public static final String EXCHANGE_NAME = "test_exchange_topic";   public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();  // exchange  channel.exchangeDeclare(EXCHANGE_NAME, "topic");   String msg = "Goods!";   // The type can be specified  String routingKey = "goods.find";  channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());  System.out.println("Send : " + msg);  channel.close();  connection.close();  } } --- package com.sowhat.mq.topic;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv1 {  public static final String EXCHANGE_NAME = "test_exchange_topic";  public static final String QUEUE_NAME = "test_queue_topic_1";  public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  final Channel channel = connection.createChannel();   channel.queueDeclare(QUEUE_NAME,false.false.false.null);  channel.basicQos(1);   channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");   // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {   @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[1] : + s);  try {  Thread.sleep(2000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println("[1] done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(),false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(QUEUE_NAME, autoAck, consumer);  } } --- package com.sowhat.mq.topic;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv2 {  public static final String EXCHANGE_NAME = "test_exchange_topic";  public static final String QUEUE_NAME = "test_queue_topic_2";   public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  final Channel channel = connection.createChannel();   channel.queueDeclare(QUEUE_NAME, false.false.false.null);  channel.basicQos(1);  // This is the main point  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");   // Define the consumer  DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override // Event trigger mechanism  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String s = new String(body, "utf-8");  System.out.println("[2] : + s);  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  e.printStackTrace();  } finally {  System.out.println(【2】 done);  // Manual acknowledgement  channel.basicAck(envelope.getDeliveryTag(), false);  }  }  };  // Automatic answer  boolean autoAck = false;  channel.basicConsume(QUEUE_NAME, autoAck, consumer);  } } Copy the code

6. Persistent and non-persistent MQ

Since messages are in memory and are lost if MQ hangs, persistence of MQ should be considered. MQ is persistent,

// Declare a queue
channel.queueDeclare(Send.QUEUE_NAME, false.false.false.null);
    / * *     * Declare a queue
 * @see com.rabbitmq.client.AMQP.Queue.Declare  * @see com.rabbitmq.client.AMQP.Queue.DeclareOk  * @param queue the name of the queue  * @param durable true if we are declaring a durable queue (the queue will survive a server restart)  * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)  * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)  * @param arguments other properties (construction arguments) for the queue  * @return a declaration-confirm method to indicate the queue was successfully declared  * @throws java.io.IOException if an error is encountered * /  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,  Map<String, Object> arguments) throws IOException; Copy the code

Durable: Durable = true: durable = true: durable = true: durable = true Because we’ve already defined test_work_queue, this queue is declared unpersisted. Conclusion: MQ does not allow you to modify an existing queue parameter.

7. The consumer side confirms the message manually and automatically

Insert a picture description here
        // Automatic answer
        boolean autoAck = false;
        channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);
Copy the code

When MQ sends a data consumer, the consumer responds to MQ on receiving the information.

If autoAck = true indicates automatic acknowledgment mode, MQ removes the message from memory once it has been distributed to consumers. If the consumer receives the message but has not finished consuming it and the data in MQ has been deleted, the pair of messages in process will be lost.

If autoAck = false indicates manual confirmation mode, if a consumer hangs up, MQ can send that information to other consumers because it has not received a return receipt.

MQ supports Message acknowledgement, in which the consumer sends a Message acknowledgement telling MQ that the Message has been consumed before MQ is removed from memory. The message reply mode defaults to false.

8. Producer confirmation mechanism (transaction + confirm)

In RabbitMQ we can solve the problem of data loss caused by MQ server exceptions through persistence, but how do producers ensure that data is sent to MQ? The producer is also unaware by default. How to solve it?

1. The closer the transaction

The first approach, AMQP, implements a transaction mechanism similar to mysql’s transaction mechanism. TxSelect: The user sets the current channel to transition mode. TxCommit: Used to commit transactions. TxRollback: Used to roll back transactions.

All of these are producer to producer operations.

package com.sowhat.mq.tx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
 import java.io.IOException; import java.util.concurrent.TimeoutException;  public class TxSend {  public static final String QUEUE_NAME = "test_queue_tx";   public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME, false.false.false.null);   String msg = "hello tx message";   try {  // Enable transaction mode  channel.txSelect();  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());  int x = 1 / 0;   // Commit the transaction  channel.txCommit();  } catch (IOException e) {  / / rollback  channel.txRollback();  System.out.println("send message rollback");  } finally {  channel.close();  connection.close();  }  } } --- package com.sowhat.mq.tx;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class TxRecv {  public static final String QUEUE_NAME = "test_queue_tx";   public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();   channel.queueDeclare(QUEUE_NAME, false.false.false.null);   String s = channel.basicConsume(QUEUE_NAME, true.new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("recv[tx] msg:" + new String(body, "utf-8"));  }  });  channel.close();  connection.close();  } } Copy the code

The disadvantage is that a large number of requests are tried and failed and then rolled back, which can reduce MQ throughput.

2. Confirm mode.

Producer-side confirm Implementation Principle The producer sets the channel to Confirm mode. Once the channel enters the Confirm mode, the information published on the channel will be assigned a unique ID(starting from 1). Once the message is sent to all matching queues, The Broker sends an acknowledgement back to the producer (with a unique message ID). This lets the producer know that the message has arrived on the destination queue correctly. If the message and queue are persistent, the acknowledgement will be sent after the message has been written to disk. The delivery-tag field that the broker sends back to the producer in acknowledgement messages contains acknowledgement messages to serial numbers. In addition, the broker can set multiple fields in basic.ack to indicate that all information has been processed prior to this serial number.

The greatest benefit of the Confirm mode is that it is asynchronous. You don’t have to wait for a reply to send the first message.

Enable confirm mode: channel.confImSelect ()

1. Generally waitForConfirms() after sending a message
package com.sowhat.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
 import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send1 {  public static final String QUEUE_NAME = "test_queue_confirm1";   public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME, false.false.false.null);   // Set channel mode to confirm mode. Note that this mode cannot be set to transaction mode.  channel.confirmSelect();   String msg = "hello confirm message";  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());  if(! channel.waitForConfirms()) { System.out.println("Message sending failed");  } else {  System.out.println("Message sending OK");  }  channel.close();  connection.close();  } } --- package com.sowhat.confirm;  import com.rabbitmq.client.*; import com.sowhat.mq.util.ConnectionUtils;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Recv {  public static final String QUEUE_NAME = "test_queue_confirm1";   public static void main(String[] args) throws IOException, TimeoutException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();   channel.queueDeclare(QUEUE_NAME, false.false.false.null);   String s = channel.basicConsume(QUEUE_NAME, true.new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("recv[tx] msg:" + new String(body, "utf-8"));  }  });  } } Copy the code
2. Send a batch of data waitForConfirms()
package com.sowhat.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;
 import java.io.IOException; import java.util.concurrent.TimeoutException;  public class Send2 {  public static final String QUEUE_NAME = "test_queue_confirm1";   public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME, false.false.false.null);   // Set channel mode to confirm mode. Note that this mode cannot be set to transaction mode.  channel.confirmSelect();   String msg = "hello confirm message";  // Batch send  for (int i = 0; i < 10; i++) {  channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());  }  / / confirm  if(! channel.waitForConfirms()) { System.out.println("Message sending failed");  } else {  System.out.println("Message sending OK");  }  channel.close();  connection.close();  } } --- Receive information as aboveCopy the code
3. Asynchronous confirm mode, which provides a callback method.

The ConfirmListener() callback provided by the Channel object only contains deliveryTag(containing the current outgoing message number). We need to maintain an unconfirm set of message numbers for each Channel. Every time the handleAck method is called back, the unconfirm set deletes one (multiple=false) or multiple (multiple=true) records of the response. From the perspective of operation efficiency, the unconfirm set is best to adopt the ordered collection SortedSet storage structure. ! [insert picture description here] (enter https://img-blog.csdnimg.cn/2020071921400852.png#pic_

package com.sowhat.mq.confirm;

import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;

import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeoutException;  public class Send3 {  public static final String QUEUE_NAME = "test_queue_confirm3";   public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  Connection connection = ConnectionUtils.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME, false.false.false.null);   // The producer calls confirmSelect  channel.confirmSelect();   // Store unacknowledged messages  final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());   // Add a listening channel  channel.addConfirmListener(new ConfirmListener() {  // There is something wrong with the receipt  public void handleAck(long deliveryTag, boolean multiple) throws IOException {  if (multiple) {  System.out.println("--handleNack---multiple");  confirmSet.headSet(deliveryTag + 1).clear();  } else {  System.out.println("--handleNack-- multiple false");  confirmSet.remove(deliveryTag);  }  }   // No problem with handleAck  public void handleNack(long deliveryTag, boolean multiple) throws IOException {  if (multiple) {  System.out.println("--handleAck---multiple");  confirmSet.headSet(deliveryTag + 1).clear();  } else {  System.out.println("--handleAck--multiple false");  confirmSet.remove(deliveryTag);  }  }  });   // The return method will be triggered if the producer waits for the Routingkey  channel.addReturnListener(new ReturnListener() {  public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("---- handle return----");  System.out.println("replyCode:" + replyCode );  System.out.println("replyText:" +replyText );  System.out.println("exchange:" + exchange);  System.out.println("routingKey:" + routingKey);  System.out.println("properties:" + properties);  System.out.println("body:" + new String(body));  }  });   String msgStr = "sssss";  while(true) { long nextPublishSeqNo = channel.getNextPublishSeqNo();  channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());  confirmSet.add(nextPublishSeqNo);  Thread.sleep(1000);  }  } } Copy the code

Conclusion: The AMQP mode performs better than Confirm mode, and the latter is recommended.

9. RabbitMQ delay queue

Taobao order payment, verification code and other time-limited type of services.

        Map<String,Object> headers =  new HashMap<String,Object>();
        headers.put("my1"."111");
        headers.put("my2"."222");
        AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();

Copy the code

10. SpringBoot Tpoic Demo

Requirements:Add the following dependencies to a new SpringBoot project:

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
Copy the code
1. The producers

application.yml

spring:
  rabbitmq:
Host: 127.0.0.1    username: admin
    password: admin
Copy the code

Test cases:

package com.sowhat.mqpublisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;  @SpringBootTest class MqpublisherApplicationTests {  @Autowired  private AmqpTemplate amqpTemplate;   @Test  void userInfo(a) {  / * * * exchange,routingKey,message * /  this.amqpTemplate.convertAndSend("log.topic"."user.log.error"."Users...");  } } Copy the code
2. Consumers

application.xml

spring:
  rabbitmq:
Host: 127.0.0.1    username: admin
    password: admin
 # Custom configurationmq:  config:  exchange_name: log.topic Configure the queue name queue_name:  info: log.info  error: log.error  logs: log.logs Copy the code

Three different consumers:

package com.sowhat.mqconsumer.service;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;  / * * * @QueueBindingValue property: Used to bind a queue.@QueueFind a queue of values in a property named value, create it if there is none, and return it if there is* Type = exchangetypes. TOPIC Specifies the exchange type. The default direct switch* / @Service public class ErrorReceiverService {   / * ** Bind a method to a queue and bind it to MSG when it receives a message* /  @RabbitListener(bindings = @QueueBinding(  value = @Queue(value = "${mq.config.queue_name.error}"),  exchange = @Exchange(value = "${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),  key = "*.log.error"  )  )  public void process(String msg) {  System.out.println(msg + " Logs...........");  } } --- package com.sowhat.mqconsumer.service;  import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;  / * * * @QueueBindingValue property: Used to bind a queue. * @QueueFind a queue of values in a property named value, create it if there is none, and return it if there is* / @Service public class InfoReceiverService {   / * ** Add a method that can process messages* /  @RabbitListener(bindings = @QueueBinding(  value = @Queue(value ="${mq.config.queue_name.info}"),  exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),  key = "*.log.info"  ))  public void process(String msg){  System.out.println(msg+" Info...........");   } } -- package com.sowhat.mqconsumer.service;  import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;  / * * * @QueueBindingValue property: Used to bind a queue. * @QueueFind a queue of values in a property named value, create it if there is none, and return it if there is* / @Service public class LogsReceiverService {   / * ** Add a method that can process messages* /  @RabbitListener(bindings = @QueueBinding(  value = @Queue(value ="${mq.config.queue_name.logs}"),  exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),  key = "*.log.*"  ))  public void process(String msg){  System.out.println(msg+" Error...........");  } } Copy the code

Detailed installation with the code to see the download:

conclusion

If you need to specify the mode is generally set on the consumer side, flexibility adjustment.

model Producers in the Queue Producers exchange Producers routingKey Consumers exchange Consumer queue routingKey
Simple(Use Simple mode less) The specified Do not specify Do not specify Do not specify The specified Do not specify
WorkQueue(less use by multiple consumers) The specified Do not specify Do not specify Do not specify The specified Do not specify
The fanout (the publish/subscribe pattern) Do not specify The specified Do not specify The specified The specified Do not specify
Direct (Routing mode) Do not specify The specified The specified The specified The specified The consumer routingKey specifies multiple precisely
Topic (Topic fuzzy matching) Do not specify The specified The specified The specified The specified The consumer routingKey can do fuzzy matching

reference

The RabbitMQ SpringBoot integration

RabbitMQ installation and SpringBoot integration demo

RabbitMQ speed starter

This article is formatted using MDNICE