This is the 10th day of my participation in Gwen Challenge

Relevant concepts

Page after startup

Connections: Connection is a RabbitMQ socket Connection that encapsulates some of the socket protocol logic

Channels are virtual connections built inside of real TCP connections. AMQP commands are sent over channels, and each channel is assigned a unique ID. One TCP connection corresponds to multiple channels, which is theoretically unlimited. This reduces the cost of TCP creation and destruction and achieves the effect of sharing TCP

Queues: RabbitMQ internal objects that store messages

Messags: Ready: the state is not being monitored. It is in a queue. Unacked: the state is being monitored but is not being consumed

total:ready+unacked

Confirm message acknowledgement mechanism, return message mechanism, and message acknowledgement consumption notification

Confirm message: Indicates whether a return message is successfully sent to the corresponding queue of the corresponding switch. A return Listener is used to process messages that are not routable. When sending a message, the exchange does not exist or the specified routingkey is not routingkey

Springboot use:

The configuration application. Yml

Spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # Open Confirm and return publisher- Confirms: confirm True publisher-returns: true # Enable manual confirmation listener: simple # None means that no reply will be sent. Manual means that listeners must notify all messages by calling channel.basicack (). Auto means that the container will answer automatically unless MessageListener throws an exception, which is the default configuration. Acknowledge -mode: manual # This configuration item determines whether a message rejected due to an exception thrown by a listener is put back into the queue. The default value is true. Default-requeue - Rejected # Enable the retry times and retry mechanism. Retry: max-attempts: 3 Enabled: trueCopy the code

The Sender. Java implements Confirm,Return

@RestController public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("/send1") public void send1() { String context = "hello " + new Date(); System.out.println("Sender : " + context); / / mandatory, return true message not sent successfully, false automatically delete messages rabbitTemplate. SetMandatory (true); rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("exchange","topic.message", context); } // @PostMapping("/send2") // public void send2() { // String context = "world " + new Date(); // System.out.println("Sender : " + context); // this.rabbitTemplate.convertAndSend("topic.messages", context); // } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {system.out.println (" message sent successfully "); } else {system.out. println(" send message failed :" + cause); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("returnMessage:"+message + i + s + s1 + s2); }}Copy the code

Consumer Receiver. Java confirms consumption

@rabbitListener (queues="topic.message") @rabbitListener (queues="topic.message")  throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); System.out.println("message:"+str); }Copy the code

Processing return message (failed to route)

Add the Return unrouted configuration

@Bean public Queue unRouteQueue() { return new Queue("queue-unroute"); } @Bean public Exchange exchange() { Map<String, Object> arguments = new HashMap<>(4); // When a message is sent to exchange-Rabbit-Springboot-advance,routingKey and bindingKey do not match. Arguments. Put ("alternate-exchange", "exchange-unroute"); return new DirectExchange("exchange", true, false, arguments); } @bean public FanoutExchange unRouteExchange() {// The name of the exchange must be the same as the alternate-exchange parameter in exchange() return new FanoutExchange("exchange-unroute"); } @Bean public Binding unRouteBinding() { return BindingBuilder.bind(unRouteQueue()).to(unRouteExchange()); }Copy the code

Dead-letter queue

Description: 1. A message is rejected and the queue is not rejoined requeue=false 2. The queue length reaches the maximum 3. The TTL of the message expires

The process by which a message enters a dead-letter queue is: message -> queue (triggering the above condition) -> DLX switch -> DLK queue

configuration

@Configuration public class RabbitmqConfig { @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build(); } @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); Args. Put ("x-dead-letter-exchange", "DL_EXCHANGE"); // x-dead-letter-routing-key args. Put ("x-dead-letter-routing-key", "KEY_R"); return QueueBuilder.durable("DL_QUEUE").withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable("REDIRECT_QUEUE").build(); } /** * The dead letter route is bound to the dead letter queue by the DL_KEY binding key. ** @return the binding */ @bean public binding deadLetterBinding() {return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null); } /** * The dead-letter route is bound to the dead-letter queue via the KEY_R binding key Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null); }}Copy the code

producers

String context = "hello " + new Date(); System.out.println("Sender : " + context); MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); / / set encoding messageProperties setContentEncoding (" utf-8 "); / / set the expiration time 10. * 1000 milliseconds messageProperties setExpiration (" 5000 "); return message; }; rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", context, messagePostProcessor);Copy the code

The consumer listens for the redirected queue

Delay queue

Messages in the queue do not need to be consumed immediately, but wait for some time before being retrieved. Such as order, reservation, service failure retry, etc. There are generally two implementations

  1. Dead letter queue, set message time, as above

  2. Rabbitmq_delayed_message_exchange rabbitmq_delayed_message_exchange rabbitmq_delayed_message_exchange rabbitmq_delayed_message_exchange Rabbitmq-plugins enable rabbitmq_delayed_message_exchange restart the service

    configuration

@bean public Queue immediateQueue() {// The first parameter is the name of the Queue to be created, Return new Queue(" delayQueue ", true); } @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange("DELAYED_EXCHANGE", "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify() { return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with("DELAY_ROUTING_KEY").noargs(); }Copy the code

The producer Sender modifies the message delay method

messageProperties.setDelay(5000);
Copy the code