This is the 8th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

1. Business background

If there is an error message, if manual nACK simultaneously puts the message back into the queue, the message is repeatedly consumed and remains in the queue.

If the message is discarded after nACK, the message will also be lost if network jitter occurs. So avoid message loss by creating a dead-letter queue.

2. Implement

The file directories are as follows:

Principle 1.

Let’s set up an extra queue. When a message enters the business queue, if a NACK is received, the message is placed in the queue.

2. Modify the POM file

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
Copy the code

3. Modify the configuration file

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.*.*
    port: 5672
    username: root
    password: root virtual-host: / listener: simple: acknowledge-mode: manual #prefetch: 1# Process one message at a time publisher- Confirms:true# publisher-returns:true# Support message sending failure return queueCopy the code

4. The rabbitmq configuration

@Configuration
public class RabbitMqConfig {

    /** * connection factory */
    @Autowired
    private ConnectionFactory connectionFactory;

    /** * Customizable AMQP template ** ConfirmCallback interface is used to implement ack callback after messages are sent to RabbitMQ exchange Exchange, but no corresponding queue is bound to the exchange when the callback is not sent in any queue ACK */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // Message sending failure returns to queue, yML needs to configure publisher-returns: true
        rabbitTemplate.setMandatory(true);
        // Send a message to confirm that YML needs to be configured with publisher-Confirms: true
        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
        // YML needs to configure publisher-returns: true
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId().toString();
            logger.debug("Message: {} failed to be sent, reply code: {} Cause: {} Switch: {} Routing key: {}", correlationId, replyCode, replyText, exchange,
                routingKey);
        });
        return rabbitTemplate;
    }

    /** * Verify that the message was sent successfully (call util method) **@return* /
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack() {
        return newMsgSendConfirmCallBack(); }}Copy the code

5. Util class

Sends a callback method for success or failure.

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    /** * callback method *@param correlationData
     * @param ack
     * @param cause* /
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MsgSendConfirmCallBack, callback ID :" + correlationData);
        if (ack) {
            System.out.println("Message sent successfully");
        } else {
            // Messages can be written locally and resend using a scheduled task
            System.out.println("Message sending failed :" + cause + "\n Resend"); }}}Copy the code

There is a point where if you want to do an implementation message and fail to resend it, you can do that in the comment. The message needs to be written locally, read from locally if it fails, then sent and deleted locally if it succeeds.

6. Business queue (e.g., order business)

The x-dead-letter-exchange and x-dead-letter-routing-key parameters are used to declare a queue.

@Configuration
public class BusinessConfig {

    /** * Service 1 module Direct Switch name */
    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";

    /** * service 1 Queue name of demo service */
    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";

    /** * service 1 Routekey */ of the Demo service
    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";

    
    @Bean
    public Queue yewu1DemoDeadQueue() {
        // Bind the normal queue to the dead-letter queue switch
        Map<String.Object> args = new HashMap<>(2);
        args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
        args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
        return new Queue("yewu1_demo_dead_queue".true.false.false, args);
    }

    /** * Bind message queue to switch */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
            .with("yewu1_demo_dead_key"); }}Copy the code

There is a point where if you want to persist messages to disk and need to create a new Queue, new Queue sets the second parameter to true, but it becomes less efficient in the face of large concurrence.

7. Dead-letter queues

Dead letter queues and bindings are declared here.

@Configuration
public class DeadConfig {

    /** * dead letter queue */
    public final static String FAIL_QUEUE_NAME = "fail_queue";

    /** * dead letter switch */
    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";

    /**
     * 死信routing
     */
    public final static String FAIL_ROUTING_KEY = "fail_routing";

    /** * create configuration dead letter queue ** /
    @Bean
    public Queue deadQueue() {
        return new Queue(FAIL_QUEUE_NAME, true.false.false);
    }

    /** * dead letter switch **@return* /
    @Bean
    public DirectExchange deadExchange() {
        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true.false);
        return directExchange;
    }

    /** * Binding relationship **@return* /
    @Bean
    public Binding failBinding() {
        returnBindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY); }}Copy the code

8. Producers and consumers

Producer and consumer code implementation.

public enum RabbitEnum {

    /** * Successfully processed */
    ACCEPT,

    /** * retried error */
    RETRY,

    /** * No retry error */
    REJECT
@RequestMapping("/sendDirectDead")
        String sendDirectDead(@RequestBody String message) throws Exception {
        System.out.println("Start production");
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
                message, data);
        System.out.println("End of production");
        System.out.println("Send id." + data);
        return "OK,sendDirect:" + message;
    }
    @RabbitListener(queues = "yewu1_demo_dead_queue")
    protected void consumerDead(Message message, Channel channel) throws Exception {
        RabbitEnum ackSign = RabbitEnum.RETRY;
        try {
            int i = 10 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            ackSign = RabbitEnum.RETRY;
            throw e;
        } finally {
            // Use a finally block to ensure that Ack/Nack will only be executed once
            if (ackSign == RabbitEnum.ACCEPT) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (ackSign == RabbitEnum.RETRY) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.false); }}}Copy the code

9. The

When sending yewu1_demo_dead_queue, if an exception is thrown, it will be put into the dead letter queue.