takeaway

First in the public: JAVA thief ship, original is not easy, like readers can pay attention to oh! A share Java learning resources, practical experience and technical articles of the public number!

preface

Native portal

The rabbitMQ – springboot version

Official Reference Documents

Core basic concepts

Server: Also known as Broker, accepts client connections and implements AMQP entity services.

Connection: The network Connection between an application and a Broker.

Channel: Network Channel in which almost all operations are performed. A Channel is a Channel through which messages are read and written. A client can establish multiple channels, each representing a session task. If a Connection is established for every access to RabbitMQ, it can be expensive and inefficient to establish TCP connections when messages are heavy. A Channel is a logical connection established within a connection. If the application supports multiple threads, each thread usually creates a separate Channel for communication. The AMQP Method contains a channel ID to help the client and Message Broker identify channels, so they are completely isolated from each other. As a lightweight Connection, Channel greatly reduces the overhead of establishing TCP Connection in the operating system.

Message: A Message, the data passed between the server and the application, consisting of Message Properties and Body. Properties can be used to modify messages, such as message priority, delay, and other advanced features. Body is the Body content of the message.

Virtual Host: Virtual address used for logical isolation, the top layer of message routing. A Virtual Host can contain multiple exchanges and queues. A Virtual Host cannot contain exchanges or queues with the same name.

Exchange: a switch that forwards messages but does not store messages and forwards messages to bound queues based on routing keys.

Binding: Virtual link between exchanges and queues. Binding can contain routing keys.

Routing key: A Routing rule that a virtual machine can use to determine how to route a particular message.

Queue: Also known as Message Queue, saves messages and forwards them to consumers.

Introduction of depend on

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
  <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-test</artifactId>  <scope>test</scope>  <exclusions>  <exclusion>  <groupId>org.junit.vintage</groupId>  <artifactId>junit-vintage-engine</artifactId>  </exclusion>  </exclusions>  </dependency>   <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  </dependency>   <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-web</artifactId>  </dependency>   <dependency>  <groupId>org.projectlombok</groupId>  <artifactId>lombok</artifactId>  </dependency>   <dependency>  <groupId>com.fasterxml.jackson.core</groupId>  <artifactId>jackson-databind</artifactId>  </dependency>   <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-tomcat</artifactId>  </dependency>  </dependencies> Copy the code

configuration

spring:
  rabbitmq:
    host: 127.0. 01.
    username: admin123
 password: 123456
 virtual-host: /test  Copy the code

Message model code example

For the use of some methods, parameter attributes are commented in the code

Simple queue model

figure


P (producer/ publisher) : producer, such as express delivery

C (consumer) : consumers, such as receiving express delivery

Red areas: queues, such as delivery areas, waiting for consumers to pick up deliveries

In one sentence

The producer sends the message to the queue, and the consumer gets the message from the queue, which is the buffer that stores the message.

Initializing the queue

package com.ao.springbootamqp.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;  @Component @Slf4j public class RabbitMqConfig {  / * * /  public static final String TEST_QUEUE = "simple-amqp_queue";   /** declare a queue * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {  * this(name, durable, exclusive, autoDelete, (Map)null); *}* String name: indicates the queue name* Boolean durable: Durable message queues. New queues do not need to be created when RabbitMQ restarts. The default value is true* Boolean EXCLUSIVE: Indicates whether this message queue applies only to the current connection. Default is false* Boolean autoDelete: indicates that message queues are automatically deleted when they are not in use. Default is false*/  @Bean(TEST_QUEUE)  public Queue testQueue(a) {  return new Queue(TEST_QUEUE, true);  }  Copy the code

Sending message class

package com.ao.springbootamqp.service;

import com.ao.springbootamqp.config.RabbitMqConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;  import java.util.UUID;  @Component @Slf4j public class RabbitMqService {   @Autowired  private RabbitTemplate rabbitTemplate;   /* Send a message to the queue */  public String sendQueue(Object payload){  return baseSend("", RabbitMqConfig.TEST_QUEUE, payload, null.null);  }   / * ** MQ common send method *  * @paramExchange switches * @paramRoutingKey queue * @paramContent the message body * @paramMessageId messageId (unique) * @paramMessageExpirationTime Indicates the persistence time * @returnMessage number* /  public String baseSend(String exchange, String routingKey, Object payload, String messageId, Long messageExpirationTime) {  /* If empty, */ is automatically generated  if (messageId == null) {  messageId = UUID.randomUUID().toString();  }  String finalMessageId = messageId;  /* Sets the message properties */  MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {  @Override  public Message postProcessMessage(Message message) throws AmqpException {  /* Write the message ID */ to the message attribute  message.getMessageProperties().setMessageId(finalMessageId);  /* Set message persistence time */  if(! StringUtils.isEmpty(messageExpirationTime)){ message.getMessageProperties().setExpiration(messageExpirationTime.toString());  }  /* Set message persistence */  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  return message;  }  };   /* Construct the message body, convert the JSON data format */  Message message = null;  try {  ObjectMapper objectMapper = new ObjectMapper();  String json = objectMapper.writeValueAsString(payload);  MessageProperties messageProperties = new MessageProperties();  messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);  message = new Message(json.getBytes(), messageProperties);  } catch (JsonProcessingException e) {  e.printStackTrace();  }   /* indicates the current message is unique */  CorrelationData correlationData = new CorrelationData(finalMessageId);   / * * * public void convertAndSend(String exchange, String routingKey, Object message,  * MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException * exchange: routingRoutingKey: Binding key* message: message body* messagePostProcessor: Message attribute processor* correlationData: indicates that the current message is unique* /  rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);   return finalMessageId;  } }  Copy the code

test

Send a message

@SpringBootTest
class RabbitMqTest {
    @Autowired
    private RabbitMqService rabbitMqService;

 @Test  public void tt(a){  String s = Sf Express;  rabbitMqService.sendQueue(s);  }  } Copy the code

Viewing the Management Page


As you can see, the message has been successfully sent to the server, and the properties of the message are exactly what we set. Since the message has been sent to the server, the consumer will be ready to consume after it is started

consumers

@Component
public class RecService {
  / * * /
    public static final String TEST_QUEUE = "simple-amqp_queue";

 @RabbitListener(queues = TEST_QUEUE)  public void t2(Message message){  try {  String msg = new String(message.getBody());  if (msg == null) {  System.out.println("Message empty");  }  System.out.println("I received =-=" + msg);  } catch (Exception e) {  e.printStackTrace();  }  } } Copy the code

Start and view



Work message model

figure


P (producer/ publisher) : producer, such as express delivery

C1, C2 (consumer) : consumers, such as receiving express delivery

Red areas: queues, such as delivery areas, waiting for consumers to pick up deliveries

Send 10 messages in a loop

 @SpringBootTest
class RabbitMqTest {
    @Autowired
    private RabbitMqService rabbitMqService;

 @Test  public void tt(a){  for (int i = 0; i <10; i++){  String s = "News" + i;  rabbitMqService.sendQueue(s);  }  } } Copy the code

Consumers 1/2

@Component
public class RecService1 {
    / * * /
    public static final String TEST_QUEUE = "work-amqp-queue";

 @RabbitListener(queues = TEST_QUEUE)  public void t2(Message message){  try {  String msg = new String(message.getBody());  if (msg == null) {  System.out.println("Message empty");  }  System.out.println("Consumer 1 receives =-=" + msg);   } catch (Exception e) {  e.printStackTrace();  }  } } Copy the code

View console



As you can see, consume as much, if you want to do more than work mode, add the following configuration:

# specify how many messages a request can handle    listener:
      simple:
# Test consumers1A value of3, consumers2A value of1
        prefetch: 1
Copy the code

Or add channel.basicqos (1) to the consumer. This tells RabbitMq not to keep sending messages to the consumer, but to wait for the consumer to confirm the previous message

@Component
public class RecService1 {
    / * * /
    public static final String TEST_QUEUE = "work-amqp-queue";

 @RabbitListener(queues = TEST_QUEUE)  public void t2(Message message,Channel channel){  try {  String msg = new String(message.getBody());  if (msg == null) {  System.out.println("Message empty");  }  System.out.println("Consumer 1 receives =-=" + msg);  channel.basicQos(1);  } catch (Exception e) {  e.printStackTrace();  }  } } Copy the code

Restart two consumers and recycle to send 10 messages to view the console as follows:



You can see consumer 1 is spending more


Subscription Model -Fanout (Broadcast mode)

In this subscription model, producers publish messages, and all messages are available to all consumers.

figure


P: producers, such as express delivery

X: Switch, equivalent to express delivery company

Red areas: queues, such as delivery areas, waiting for consumers to pick up deliveries

C1, C2: consumers, such as receiving express delivery

Change the following configuration in RabbitMqConfig to declare queues 1 and 2 and bind the switch to them

  /* Switch */
    public static final String TEST_EXCHANGE = "fanout_amqp_exchange";

    /* Declare a fanout switch */
    @Bean(TEST_EXCHANGE)
 public Exchange testExchange(a) {  // durable(true) Persistent, The switch remains after mq restarts  return ExchangeBuilder.fanoutExchange(TEST_EXCHANGE).durable(true).build();  }    / * 1 * /  public static final String TEST_QUEUE_1 = "fanout_amqp_queue_1";  / * 2 * /  public static final String TEST_QUEUE_2 = "fanout_amqp_queue_2";   /** declare queue 1 * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {  * this(name, durable, exclusive, autoDelete, (Map)null); *}* String name: indicates the queue name* Boolean durable: Durable message queues. New queues do not need to be created when RabbitMQ restarts. The default value is true* Boolean EXCLUSIVE: Indicates whether this message queue applies only to the current connection. Default is false* Boolean autoDelete: indicates that message queues are automatically deleted when they are not in use. Default is false*/  @Bean(TEST_QUEUE_1)  public Queue testQueue1(a) {  return new Queue(TEST_QUEUE_1, true);  }  /* Declare queue 2*/  @Bean(TEST_QUEUE_2)  public Queue testQueue2(a) {  return new Queue(TEST_QUEUE_2, true);  }    /* Queue 1 binds to the route */  @Bean  Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("")  .noargs();  }   /* Queue 2 binds the route */  @Bean  Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("")  .noargs();  } Copy the code

Add send to switch to RabbitMqService

  /* Send to the switch */
    public String sendExchange(Object payload,String routingKey){
        return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null.null);
    }
Copy the code

Send a message

 @Test
    public void t1(a){
            String s = "Broadcast Express";
            rabbitMqService.sendExchange(s,"");
    }
Copy the code
Check switch bindings

Check whether the message is sent successfully

You can see that the server has been successfully sent

Priming consumer

Change the corresponding queue name before starting

 @RabbitListener(queues = TEST_QUEUE)
Copy the code


Subscription model -Direct (Routing mode)

In this subscription model, producers publish messages and consumers selectively receive them. The binding of the queue to the switch cannot be arbitrary, but must specify a RoutingKey. The sender of a message must also specify the message routing key when sending a message to Exchange

figure

P: producers, such as express delivery

X: Switch, equivalent to express delivery company

Red areas: queues, such as delivery areas, waiting for consumers to pick up deliveries

C1, C2: consumers, such as receiving express delivery

Error, info, those are routingkeys we’re talking about

Change the RabbitMqConfig configuration to specify a RoutingKey when the switch binds to the two queues, with queue 1 only receiving SF express and queue 2 only receiving JD express

  /* Switch */
    public static final String TEST_EXCHANGE = "direct_amqp_exchange";

    /* Declare a direct switch */
    @Bean(TEST_EXCHANGE)
 public Exchange testExchange(a) {  // durable(true) Persistent, The switch remains after mq restarts  return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();  }   / * 1 * /  public static final String TEST_QUEUE_1 = "direct_amqp_queue_1";  / * 2 * /  public static final String TEST_QUEUE_2 = "direct_amqp_queue_2";   /** declare a queue * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {  * this(name, durable, exclusive, autoDelete, (Map)null); *}* String name: indicates the queue name* Boolean durable: Durable message queues. New queues do not need to be created when RabbitMQ restarts. The default value is true* Boolean EXCLUSIVE: Indicates whether this message queue applies only to the current connection. Default is false* Boolean autoDelete: indicates that message queues are automatically deleted when they are not in use. Default is false*/  @Bean(TEST_QUEUE_1)  public Queue testQueue1(a) {  return new Queue(TEST_QUEUE_1, true);  }   @Bean(TEST_QUEUE_2)  public Queue testQueue2(a) {  return new Queue(TEST_QUEUE_2, true);  }   /* Queue 1 routes are bound */  @Bean  Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("SF")  .noargs();  }   /* Queue 2 routes are bound */  @Bean  Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("JD")  .noargs();  } Copy the code

Send a message

@Test
public void t2(a){
        String s = Jingdong Express;
        String s1 = Sf Express;
        rabbitMqService.sendExchange(s,"JD");
 rabbitMqService.sendExchange(s1,"SF"); } Copy the code
Check switch bindings

Check whether the message is sent successfully

Priming consumer

Change the corresponding queue name before startingLogically, consumer 1 should receive SF Express and consumer 2 should receive JINGdong Express. The results are as follows:

The results were in line with expectations.

Subscription Model -Topic (Wildcard pattern)

figure


Topic exchanges, compared to Direct, can route messages to different queues based on a RoutingKey. Topic Exchange allows queues to bind Routing keys using wildcards.

A Routingkey is typically made up of one or more words, with “between” words. segmentation

Wildcard rules:

# : Matches one or more words

* : matches exactly 1 word

RabbitMqConfig is basically the same as direct, with the change of queue name and switch, routingKey to queue 1 receiving SF express and queue 2 receiving any express

  
  /* Declare a direct switch */
    @Bean(TEST_EXCHANGE)
    public Exchange testExchange(a) {
        // durable(true) Persistent, The switch remains after mq restarts
 return ExchangeBuilder.topicExchange(TEST_EXCHANGE).durable(true).build();  }   /* Queue 1 routes are bound */  @Bean  Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("SF.kd")  .noargs();  }   /* Queue 2 routes are bound */  @Bean  Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("#.kd")  .noargs();  } Copy the code

Send a message

  @Test
    public void t2(a){
            String s = "EMS express delivery";
            String s1 = Sf Express;
            String s2 = Jingdong Express;
 rabbitMqService.sendExchange(s,"EMS.kd");  rabbitMqService.sendExchange(s1,"SF.kd");  rabbitMqService.sendExchange(s2,"JD.kd");  } Copy the code
Check switch bindings

Check whether the message is sent successfully

Priming consumer

The results are as expected!




The advanced

Tip: The following code examples are demonstrated in routing mode.

Message reliability delivery

Implementing RabbitMQ messages reliably requires three things:

  • RabbitMQ message confirmation mechanism: There are two types of RabbitMQ message confirmation: message sending confirmation and consumption receiving confirmation. Message sending confirmation is to confirm whether a message can be reliably delivered when the producer sends a message to the Exchange and the Exchange dispatches the message to the Queue. The first step is to check whether the Exchange is reached, and the second step is to check whether the Queue is reached.
  • Switch, queue, message persistence: Prevent messages from being sent to the broker and it dies before the consumer can consume it
  • Consumer confirmation: There are three modes: None (no reply will be sent), Auto (automatic reply), and manual(manual reply). To ensure message reliability, we set up manual reply, why? Each time a consumer receives a message, the Broker places the message as completed and deletes it from the Queue, regardless of whether the processing is complete. If the consuming end throws an exception and fails to consume the message, the message is lost. In manual reply mode, basicAck, basicNack, and basicReject can be called. An ACK is sent only when the message is processed correctly.

RabbitMQ message confirmation mechanism

Modify the configuration
spring:
  rabbitmq:
    host: 127.00.1.    username: admin123
    password: 123456
 virtual-host: /test The ConfirmCallBack interface is implemented to trigger the callback after the message is sent to Exchange publisher-confirms: true Implement the ReturnCallback interface, which is triggered if a message fails to be sent from the exchange to the corresponding queue publisher-returns: true  listener: # message consumption confirmation, can be manually confirmed simple:  acknowledge-mode: manual Copy the code
Modify RabbitMqService

Add interface to implement ConfirmCallBack and ReturnCallback interface code

 // A callback is triggered when a message is sent to Exchange
    private final RabbitTemplate.ConfirmCallback confirmCallback =
            new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 if (ack) {  // Successful business logic  log.info("Message delivered to and switch successfully!!");  } else {  // Failed business logic  log.info("Message delivery to switch failed!!");  }  }  };   Emitted if a message fails to be sent from the exchange to the corresponding queue  private final RabbitTemplate.ReturnCallback returnCallback =  new RabbitTemplate.ReturnCallback() {  @Override  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {  // Failed business logic  log.info("message=" + message.toString());  log.info("replyCode=" + replyCode);  log.info("replyText=" + replyText);  log.info("exchange=" + exchange);  log.info("routingKey=" + routingKey);  }  }; Copy the code

In rabbitTemplate. ConvertAndSend (exchange, routingKey, message, messagePostProcessor, correlationData) before adding the following code:

    rabbitTemplate.setConfirmCallback(this.confirmCallback);
        rabbitTemplate.setReturnCallback(this.returnCallback);
Copy the code
Tests whether the message was successfully sent to the switch

For testing purposes, send messages with controller. If the message is not routed to an appropriate Exchange, the ACK returned by the Confirm mechanism will return false, and perform some service logic, such as retry or compensation

@RestController
public class TestController {

    @Autowired
    private RabbitMqService sender;
  @PostMapping("/tt")  public String sendMsg(String msg){  sender.sendExchange(msg,"");  return "ok";  } } Copy the code
View console

Tests whether the message to the queue was successful

Here, a non-existent Routingkey is specified when the message is sent, simulating a failure callback

 sender.sendExchange(msg,"XXX");
Copy the code
View console

Switches, queues, messages persist

This is mentioned in the previous code, skipped.

Consumer confirmation mechanism

BasicAck, basicNack, basicReject

basicAck

When multiple is false, only the current message is acknowledged. When multiple is true, batch validates all messages smaller than the current deliveryTag. DeliveryTag is used to identify the message delivered in a Channel. RabbitMQ ensures that the deliveryTag of a message increments from 1 in each Channel.

   public void basicAck(long deliveryTag, boolean multiple) throws IOException {
        this.transmit(new Ack(deliveryTag, multiple));
        this.metricsCollector.basicAck(this, deliveryTag, multiple);
    }
Copy the code
basicNack

This can be used when an exception occurs when a consumer consumes a message. When requeue is true, the failed message is re-queued. When the maximum number of retries is reached, the message is discarded, or a dead letter Queue and a retry Queue. When requeue is false, the message is discarded.

public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
        this.transmit(new Nack(deliveryTag, multiple, requeue));
        this.metricsCollector.basicNack(this, deliveryTag);
    }
Copy the code
basicReject

Same as basicNack.

test

I’ll comment out the manual confirmation

 @RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message, Channel channel) throws IOException {
            String msg = new String(message.getBody());
            System.out.println("Consumer 1 receives =-=" + msg);
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag,false);  } Copy the code
Viewing the Management Page

The message becomes unacked

Stop the consumer application and the message becomes ready again, because although we have set up manual ACK, there is no message acknowledgement in the code! So the message is not really consumed. When we turn off the consumer, the state of the message is again called Ready



Consumer retry mechanism

configuration

Add the following configuration: The listener is used to configure retry parameters, and the template is used to configure retry parameters

    listener:
# message consumption confirmation, can be manually confirmed      simple:
        acknowledge-mode: manual
Whether to enable consumer retry (yesfalseWhen the consumer retry is turned off, the consumer code exception will receive the message repeatedly. retry:  enabled: true The initial retry interval is1s  initial-interval: 1000 The maximum number of retries max-attempts: 3 # maximum retry interval1s  max-interval: 1000 The factor of each retry is1.0arithmetic multiplier: 1.0 Copy the code

test

Int I =1/0;

View console

As you can see, the consumption was retried three times

What if the retry times are used up

What if the Listener.Retry attempts run out and still throw an exception? You can configure MessageRecoverer to handle abnormal messages. By default, there are two implementations:

  • RepublishMessageRecoverer: the message is sent to the specified queue again, need to be configured manually. Test it out:

Add the following to RabbitMqConfig: Declare a retry switch (RETRY_EXCHANGE) and a retry queue (RETRY_QUEUE) and bind them, routingkey: Retry

 @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, "retry");
    }
Copy the code

Add a consumer as follows:

  @RabbitListener(queues = RETRY_QUEUE)
    public void t3(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("Retry consumer received =-=" + msg);
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
 channel.basicAck(deliveryTag,false);  } Copy the code

The number of retries used up (because the maximum number of retries was set to 3). The test result is as follows:


  • RejectAndDontRequeueRecoverer: if not manually configure MessageRecoverer, will use the default, simply abnormal will print out, source (just) as follows:
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {

    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);

    @Override
 public void recover(Message message, Throwable cause) {  if (this.logger.isWarnEnabled()) {  this.logger.warn("Retries exhausted for message " + message, cause);  }  throw new ListenerExecutionFailedException("Retry Policy Exhausted".new AmqpRejectAndDontRequeueException(cause), message);  } } Copy the code

Repeated message consumption

The retry mechanism may cause delays, resulting in the problem of repeated consumption, such as payment, push SMS, email, etc.

Solution – Globally unique ID

  • The sender of a message adds a unique ID, such as a UUID, order number, timestamp, traceId, etc., to the message header when it is sent. The encapsulation of the message was dealt with above:

  • After receiving the message, the receiver first obtains the unique ID of the message header and determines whether the redis contains the unique ID. If the ID is contained, the message is not processed. If redis does not contain a unique ID, the message is processed and the unique ID is cached on success


Dead-letter queue

What is a dead-letter queue

Dead Letter, as the name implies, is a message that cannot be consumed. If the message is not consumed due to some abnormality of the consumer, the message will be re-posted to another Exchange(Dead Letter Exchanges), which will then redirect to another queue according to the routingKey. The message is reprocessed on this queue.

A common source of dead-letter queues

  • The message is rejected (basic.reject or basic.nack) and Requeue =false.
  • Message TTL expired
  • The queue reaches its maximum length (the queue is full and no more data can be added to MQ)

Dead letter handling

  • A message is basic.reject or basic.nack with Requeue =false without a rejoin parameter or the upper limit of retry rejoin times has been reached
  • The TTL(Time To Live) of the message has expired
  • Queue length limit exceeded (queue full, “x-max-length” parameter)

This example uses the third.

Code sample

Initialize dead letter queues, switches, and bindings

Declare a dead letter exchange (DL_EXCHANGE) and a dead letter queue (DL_QUEUE) and bind them. X-dead-letter-exchange and x-dead-letter-routing-key were added to TEST_QUEUE_1.

   /* Service switch */
    public static final String TEST_EXCHANGE = "test_amqp_exchange";

 /* Declare the service switch */
    @Bean(TEST_EXCHANGE)
 public Exchange testExchange(a) {  // durable(true) Persistent, The switch remains after mq restarts  return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();  }   / * 1 * /  public static final String TEST_QUEUE_1 = "test_amqp_queue_1";   @Bean(TEST_QUEUE_1)  public Queue testQueue1(a) {  Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange declares a dead-letter exchange  args.put("x-dead-letter-exchange", DL_EXCHANGE); // x-dead-letter-routing-key specifies the dead-letter routing key  args.put("x-dead-letter-routing-key"."dlk");  return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();  }   /* Queue 1 routes are bound */  @Bean  Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,  @Qualifier(TEST_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("SF")  .noargs();  }   /* Dead letter switch */  public static final String DL_EXCHANGE = "deadLetterExchange";   /* Declare a dead letter switch */  @Bean(DL_EXCHANGE)  public Exchange deadLetterExchange(a) {  return ExchangeBuilder.directExchange(DL_EXCHANGE).durable(true).build();  }   /* Dead letter queue */  public static final String DL_QUEUE = "deadLetterQueue";  /* Declare a dead-letter queue */  @Bean(DL_QUEUE)  public Queue deadLetterQueue(a) {  return new Queue(DL_QUEUE,true);  }   /* Bind a dead letter queue to a dead letter switch */  @Bean  Binding bindingDead(@Qualifier(DL_QUEUE) Queue queue,  @Qualifier(DL_EXCHANGE) Exchange exchange) {  return BindingBuilder  .bind(queue)  .to(exchange)  .with("dlk")  .noargs();  }  Copy the code
consumers
  // Business consumer
@RabbitListener(queues = TEST_QUEUE)
    public void t2(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        try {
 int i = 1/0;  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  } catch (Exception e){  System.out.println("Consumer 1 has made a mistake.");  channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.false);  }}  // Dead-letter consumers  @RabbitListener(queues = DL_QUEUE)  public void t3( Message message, Channel channel) throws IOException {  String msg = new String(message.getBody());  System.out.println("Dead letter queue received =-=" + msg);  long deliveryTag = message.getMessageProperties().getDeliveryTag();  channel.basicAck(deliveryTag,false);  } Copy the code

View console


About the process

The process is that the message is consumed by the business consumer, and when the business consumer hangs up, the catch code basicNack is used. Mq receives the NACK and redelivers the message to the dead-letter exchange bound to the business queue X-dead-letter-Exchange. The dead-letter routing key bound to the business queue X-dead-letter-routing-key is then matched to the dead-letter queue, which is eventually consumed by the dead-letter consumer.


Delays in the queue

What is a delay queue

Delayed queue as the name implies, that is, messages placed in this queue do not need to be consumed immediately, but wait for a period of time to be taken out for consumption.

Application scenario of delay queue

  • Orders that are not paid within 10 minutes are automatically cancelled.
  • If the user makes a refund and the seller does not process the refund within three days, the seller or the platform where the notification is based will be notified by SMS.

Implement delay queue mode

Mode 1: TTL (Time To Live) + Dead Letter Exchanges (DLX)

Dead letters (DLX) have been covered above, but what is TTL? RabbitMQ can configure x-message-TTL for queues or setExpiration for messages to control the duration of messages, and if they expire at the earliest, the messages will become dead letters.

There are two ways to set the TTL
  • By setting the queue property, all messages in the queue have the same expiration time.

    Disadvantages: If you use this method to set the TTL of messages, when the delay time gradient is large, such as 1 minute, 2 minutes, 5 minutes, 12 minutes…… Many switches and queues need to be created to route messages.

   @Bean(TEST_QUEUE_1)
    public Queue testQueue1(a) {
        Map<String, Object> args = new HashMap<>(2);
        // Declare an expiration time of 5 seconds
        args.put("x-message-ttl".5000);
 // x-dead-letter-exchange declares a dead-letter exchange  args.put("x-dead-letter-exchange", DL_EXCHANGE);  //x-dead-letter-routing-key specifies the dead-letter routing key  args.put("x-dead-letter-routing-key"."dlk");  return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();  } Copy the code
  • Messages can be set individually, and the TTL for each message can be different.

    Disadvantages: If you set TTL for messages separately, it may block messages in the queue because the queue is first in, first out, the first message is not queued (not consumed), and subsequent messages cannot be delivered. The message may not “die” on time because RabbitMQ will only check if the first message is out of date and if it is out of date it will be put into a dead letter queue. Index If the first message is delayed for a long time and the second message is delayed for a short time, the second message will not be executed first.


Rabbitmq-delayed -message-exchange plugin

Official website to download

It can solve the problem of setting TTL for messages separately and processing the delay time first

Code sample

The following demonstration is dead letter +TTL, code or above the main text dead letter queue

The queue expiration time is set

Add the X-message-TTL configuration to the service queue and set it to one second. Consumers delete business consumers (simulated messages are not consumed and expire), leaving only dead letter consumers; The rest is the same.

  @Bean(TEST_QUEUE_1)
    public Queue testQueue1(a) {
        Map<String, Object> args = new HashMap<>(2);
        // Declare an expiration time of 5 seconds
        args.put("x-message-ttl".1000);
 // x-dead-letter-exchange declares a dead-letter exchange  args.put("x-dead-letter-exchange", DL_EXCHANGE);  //x-dead-letter-routing-key specifies the dead-letter routing key  args.put("x-dead-letter-routing-key"."dlk");  return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();  } Copy the code
View console

As you can see, the time is consumed by the dead letter consumer after 1 second


Message set expiration time

Comment out the queue expiration time and modify the send method as follows;

/* Send to the switch */
    public String sendExchange(Object payload,String routingKey,Long messageExpirationTime){
        return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, messageExpirationTime);
    }

Copy the code

The controller is as follows, and the consumer remains the same

  @Autowired
    private RabbitMqService sender;

    @PostMapping("/tt")
    public String sendMsg(String msg){
 sender.sendExchange(msg,"SF".5000L);  System.out.println("[5-second expiration time test] Send time is:"+LocalDateTime.now());  return "ok";  } Copy the code
View console

You can see that the message is consumed by the dead letter consumer 5 seconds later


Finally, welcome to pay attention to my wechat public number! A share Java learning resources, practical experience and technical articles of the public number!