Previously: RabbitMQ management interface view posture

I. Fast setup/basic information delivery and consumption

1. Introduce dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2, application. Yml

Spring: rabbitmq: host: ipXXX port: 5672 username: account XXX password: password XXX virtual-host: /wen #

Take the Direct pattern as an example

1、配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class RabbitConfig {
    //队列 起名:TestDirectQueue
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("weixin.fanout.queue", true);
    }
    @Bean
    public Queue TTLQueue() {
        Map<String, Object> map = new HashMap<>(16);
        map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {
        return new DirectExchange("TTL_EXCHANGE", true, false);
    }


    //Direct交换机 起名:TestDirectExchange
    @Bean
    public DirectExchange fanoutOrderExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("fanout_exchange", true, false);
    }
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
    }

    @Bean
    public Binding bindingDirect1() {
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect2() {
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect3() {
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
    }
}


2、生产者
package com.pit.barberShop.common.MQ.Rabbit.fanout;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author :wenye
 * @date :Created in 2021/6/15 21:41
 * @description:广播模式
 * @version: $
 */
@RestController
@RequestMapping("/rabbitmq")
public class ProducerFanout {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定义交换机
    private String exchangeName = "fanout_exchange";
    // 2: 路由key
    private String routeKey = "";

    @RequestMapping("/fanout")
    public void markerFanout() {
        String message ="shua";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
    }

    @RequestMapping("/ttl")
    public String testTTL() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 设置过期时间,单位:毫秒
        byte[] msgBytes = "测试消息自动过期".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
        return "ok";
    }
}

3、消费者

package com.pit.barberShop.common.MQ.Rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
 * @author :wenye
 * @date :Created in 2021/6/15 22:07
 * @description:fanout消费者
 * @version: $
 */
@Component
public class ConsumerFanout {

    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
            value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
            // order.fanout 交换机的名字 必须和生产者保持一致
            exchange = @Exchange(value = "fanout_exchange",
                    // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("sms-two111------------->" + message);
    }


    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
            value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
            // order.fanout 交换机的名字 必须和生产者保持一致
            exchange = @Exchange(value = "fanout_exchange",
                    // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void messageWXrevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("weixin----two---------->" + message);
    }
}

Two, expiration time

Public String testTTL() {MessageProperties MessageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); Byte [] msgBytes = "test message automatically expires ".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message); return "ok"; @Bean public Queue ttlQueue () {Map<String, Object> Map = new HashMap<>(); map.put("x-message-ttl", 30000); Return new Queue("TTL_QUEUE", true, false, false, map); return new Queue("TTL_QUEUE", true, false, map); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); Return new Queue("TTL_QUEUE", true, false, false, map); return new Queue("TTL_QUEUE", true, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); }

Third, the message confirmation mechanism configuration

There are two mechanisms to ensure message consistency: a message acknowledgement mechanism and a transaction mechanism

  • Transaction mechanism, reference: [https://spring.hhui.top/sprin]… (http://)
@Service public class TransactionPublisher implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate transactionRabbitTemplate; @ PostConstruct public void init () {/ / set the channel for the transaction mode transactionRabbitTemplate setChannelTransacted (true); transactionRabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(" Transaction "+ message +" Send failed "); } / * * * general usage, push the message * call this method sends the message is: want to open channel configuration confirmation and abnormal ReturnCallback consumers * transactionRabbitTemplate setChannelTransacted (true); * transactionRabbitTemplate.setReturnCallback(this); * @param ans * @return */ @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager") public String publish(String ans) { String msg = "transaction msg = " +  ans; System.out.println("publish: " + msg); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData); return msg; }}
  • Message confirmation mechanism for reference: https://blog.csdn.net/qq33098… Default is auto reply

    Spring: rabbitmq: # Confirms the guarantees made before sending

Currently there are two callbacks: confirmCallback and returnCallback. Here’s the difference


ConfirmCallback if the message does not reach an exchange, ack=false,


ConfirmmCallback callback,ack=true, if the message arrives in an exchange

ReturnCallback is not called if exchange to queue succeeds

The flow of messages sent by the RabbitMQ message producer

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @ Slf4j @ Component public class ConfirmCallbackService implements RabbitTemplate. ConfirmCallback {/ * * * correlationData: There is only one id attribute inside the object, which is used to indicate the uniqueness of the current message. * ack: Message delivered to the state of the broker. True means success. * cause: The delivery failed. **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (! Ack) {log.error(" Message sending exception!" ); } else {log.info(" sender dad has received acknowledgement, correlationData={},ack={}, cause={}", correlationdata.getId (), ack, cause); } } } import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @ Slf4j @ Component public class ReturnCallbackService implements RabbitTemplate. ReturnCallback {/ / rewrite returnedMessage () Method, The method takes five arguments: message (message body), replyCode (response code), replyText (response content), exchange (exchange), routingKey (queue) @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); @Bean public RabbitTemplate RabbitTemplate (ConnectionFactory connectionFactory) {@Bean public RabbitTemplate RabbitTemplate (ConnectionFactory connectionFactory) {@Bean public RabbitTemplate RabbitTemplate (ConnectionFactory) {@Bean public RabbitTemplate RabbitTemplate (ConnectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); / / here is also set to MANUAL ack factory. SetAcknowledgeMode (AcknowledgeMode. MANUAL); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; @Bean public Queue chongFuQueue () {// Durable: Durable. The default is false. Durable: Durable Queue is stored on disk and will exist when the message broker is restarted. // exclusive: Also false by default, can only be used by the currently created connection, and the queue is deleted when the connection is closed. Durable // AutoDelete: Delete the queue automatically, and it will be deleted when no producer or consumer uses it. // return new Queue("TestDirectQueue",true,true,false); False return new Queue("chongfu.fanout.queue", true); } //Direct switch name: TestDirectExchange @Bean public DirectExchange chongfuExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("chongfu_exchange", true, false); } @Bean public Binding bindingDirect4() { return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with(""); } Producer public void markerchongfu() {/** * Enables that a message can be sent back to the queue if it fails. * Note: YML needs to configure Publisher-Returns: true */ rabbitTemplate.setMandatory(true); / * * * consumer confirm after receiving the message, the manual ack receipt callback processing. * / rabbitTemplate setConfirmCallback (confirmCallbackService); / * * * message delivery to the queue failed callback processing. * / rabbitTemplate setReturnCallback (returnCallbackService); /** * Send message */ String s = uuid.randomuuid ().toString(); RabbitTemplate. ConvertAndSend (" chongfu_exchange routeKey, "handsome boy", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(s)); } consumer @rabbitListener (bindings = @queueBinding (// email.fanout.queue is the name of the queue, which you can define as you like. value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"), Exchange = @exchange (value = "chongfu_exchange"); // RabbitMQ mode is determined as follows: Type = exchangePes.direct)) public void procesHandler (String MSG, Channel, Channel) Throws IOException {try {log.info(" {}", MSG); / / log. The info (" serial number: {} ", message. GetMessageProperties () getDeliveryTag ()); // System.out.println(msg); / / / / TODO specific business messages are received basicAck () channel. BasicAck (message. GetMessageProperties () getDeliveryTag (), false); {if} the catch (Exception e) (message. GetMessageProperties () getRedelivered ()) {log. The error (" the news has been repeated failure, refuse to receive again..." ); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else {log.error(" Message is about to return to queue processing again...") ); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); }}}

There are three return receipt methods for consuming messages

1, basicAck

Basicack: Indicates a successful acknowledgement. After using this return receipt method, the message will be deleted by the RabbitMQ broker.

void basicAck(long deliveryTag, boolean multiple) 

  • DeliveryTag: Indicates the delivery number of a message. The deliveryTag will be increased each time a message is consumed or redelivered. In manual message acknowledgement mode, we can ack, nack, reject a message with a specified deliveryTag.
  • Multiple: Whether to batch confirm or not. A value of true will ack all messages smaller than the current message deliveryTag at once.

As an example, let’s say I send three messages — 5, 6, 7 — but none of them are confirmed. When I send the fourth message, the deliveryTag is 8 and multiple is set to true, which will confirm all the messages on 5, 6, 7, and 8.

2, basicNack

BasicNack: Represents failed acknowledgement. This method is typically used when consuming a message business exception to requeue the message.

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • DeliveryTag: Indicates the message delivery number.
  • Multiple: Batch confirmation.
  • Requeue: The message will be requeued if the value is true.

3, basicReject

BasicReject: Reject a message, unlike basicNack in that it cannot be processed in bulk, otherwise similar.

void basicReject(long deliveryTag, boolean requeue)

  • DeliveryTag: Indicates the message delivery number.
  • Requeue: The message will be requeued if the value is true.

Four, dead letter queue

Deadletter queues are not much different from regular queues. You need to create your own Queue, Exchange, and then bind to an Exchange via routingkeys, but the routingkeys and Exchange of a dead letter Queue must be used as parameters to bind to a normal Queue. An application scenario is normal queue the message inside was basicNack or reject, the message will be routed to the normal dead-letter queue queue bindings, and a common scenario is opened automatically sign for it, then the abnormal, when consumers’ message than retries, the message will enter the dead-letter queue, If configured,

example

// The topic exchange matches the wildcard. The topic exchange matches the wildcard. @Bean TopicExchange emailExchange() {return new TopicExchange(" DemotopicExchange "); } // @bean public Queue deadletterQueue () {return new Queue(" Demo.dead.letter "); } / / dead-letter switches @ Bean TopicExchange deadLetterExchange () {return new TopicExchange (" demoDeadLetterTopicExchange "); } // Binding DeadletQueue @Bean Binding BindingDeadletterQueue () {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter"); } producer @requestMapping ("/sixin") public void sendEmailMessage() {correlationData correlationData=new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData); The info (" -- -- -- send email messages - {} - messageId -- - {} ", "111", correlationData. GetId ()); } User /** * User * @Param Message * @Param Channel * @throws IOException */ @RabbitListener(bindings =@QueueBinding(//)) Email.fanout. queue is the name of the queue, which you can define as you like. value = @Queue(value = "demo.email",autoDelete = "false", arguments = { @Argument(name = "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"), @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"), @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") }), key = "demo.email", Exchange = @exchange (value = "DemotopicExchange"); // RabbitMQ mode is defined as: Type = exchangeType = exchangeType = exchangeType = exchangeType = exchangeType = exchangeType = exchangeType = exchangeType = exchangeType Throws IOException {try {log.info("); Int m=1/0; / / manual to sign for the channel. BasicAck (message. GetMessageProperties () getDeliveryTag (), false); } catch (Exception E) {// Exception E) { True to the team, or false, enter the dead-letter queue channel. BasicNack (message. GetMessageProperties () getDeliveryTag (), false, false); }} /** * Dead letter consumer, automatic sign on state, more than the number of retry, or manual sign, Reject or Nack * @Param message */ @RabbitListener(Queue = "Demo.dead.letter ") public void handleDeadLetterMessage(Message message, Channel Channel, @ Headers Map < String, Object > Headers) throws IOException {/ / consider database records, every time come in to check the number, reach a certain number of warning, Info (" received dead-letter message :-- {}-- message ID-- {}", new String(message.getBody()),headers.get("spring_returned_message_correlation")); Reply / / ack channel. BasicAck (message. GetMessageProperties () getDeliveryTag (), false); }

Java class configuration is also available

@Bean public Queue emailQueue() { Map<String, Object> arguments = new HashMap<>(2); The arguments / / bind dead-letter switches. The put (" x - dead - letter - exchange ", "demoDeadLetterTopicExchange"); // put(" X-dead-letter-routing-key ", "demo.dead.letter"); // put(" X-dead-letter-routing-key ", "demo.dead.letter"); arguments.put("x-message-ttl", 3000); return new Queue(emailQueue,true,false,false,arguments); } @Bean TopicExchange emailExchange() { return new TopicExchange(topicExchange); } @Bean Binding bindingEmailQueue() { return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#"); }

Persistence mechanism and memory disk monitoring

RabbitMQ’s persistence queue is divided into:

1: Queue persistence 2: Message persistence 3: Switch persistence

Both persistent and non-persistent messages can be written to disk, but non-persistent messages are written to disk when memory runs out.

2, memory disk monitoring

6. Distributed transactions

How to ensure that producer messages can be consumed

Reliable production problem of distributed transactional messages based on MQ – timed resending

Use the RabbitMQ confirmation mechanism after the message into the exchange messages in the modify data at the state, regularly retransmission, and for some reason the message failed to send to the switch and the DB stored information, a certain time and then to send, if also failed many times this message there is a problem (message table field record number is more).

2. How to ensure that consumers must have consumed the message

Question: How can I ensure that an exception triggers a retry mechanism without causing an infinite loop

  • Control the number of retries + dead-letter queue retries only for auto-reply mode, which conflicts with manual mode
  • Try ————catch + manual ACk
  • Try__catch + manual ACK + dead letter queue

Seven, configuration details

rabbitmq: addresses: 127.0.0.1:6605127.00 0.1:6606127.00 0.1:6705 # specified client connect to the server address, multiple comma-delimited (priority addresses, and then take the host) # port: ## Addresses: IP :port, IP :port password: admin username: 123456 virtual-host: RabbitMQ vhost requested-heartbeat: # specify a heartbeat timeout, 0 is not specified. # default 60 s publisher - confirms: whether to enable release confirmation publisher - reurns: # issued return whether to enable the connection timeout: # connection timeout, unit of milliseconds, 0 means infinity, not timeout cache: Channel. size: # Number of Channels to hold in the cache Channel. checkout-timeout: # When the cache number is set, the timeout of a channel in milliseconds is retrieved from the cache; If it is 0, then always create a new channel connection.size: # Number of connections cached, only valid in connection mode connection.mode: # CONNECTION factory cache mode: CHANNEL and CONNECTION listener: simple.auto-startup: # CONNECTION factory cache mode: CHANNEL and CONNECTION listener: simple.auto-startup: # CONNECTION factory: simple.acknowledge-mode: # represents the message acknowledgement method, which is configured in three ways: NONE, MANUAL, and AUTO. Default auto simple.concurrency: # minimum number of consumers simple.max-concurrency: # maximum number of consumers simple.prefetch: # Specifies how many messages a request can handle. If there are any transactions, this must be greater than or equal to the number of transactions. # Specify the number of messages to be processed in a transaction. Prefetch should be less than or equal to the number of prefetch messages. Default is true (related to the adder-mode parameter) simple.idle-event-interval: # How long to post the idle container in milliseconds simple.retry. Enabled: # Listen.Retry. Max-Attempts: # Maximum Number of Retry Attempts: Simple.Retry. Initial-Interval: # interval between the first and second attempts to publish or deliver a message simple.retry. Multiplier: # multiplier applied to the previous retry interval simple.retry. Max-interval: # maximum retry interval simple.retry.stateless: # retry is stateful or stateless template: mandatory: # enable mandatory information; Default false receive-timeout: # receive() timeout: # sendAndReceive() timeout: # sendAndReceive() timeout: # Send retry if available Retry. Max-Attempts: # Maximum Number of Retry Attempts Retry. Initial-Interval: # The interval between the first and second attempts to publish or deliver messages Retry. Multiplier: # The multiplier applied to the previous retry interval: # Maximum retry interval