1. Sender exception monitoring

1.1 Types of Sender exceptions

  1. Basic processing flow

  • A compensation scheme

  1. Simulate broker breakdown: modify sender port 5673, then start, send message, wrong port cannot connect to host
    • Error message: java.net.ConnectException: Connection timed out: connect
    • Compensation scheme: add exception handling, return error if unreachable
    • The error can be found at the time of sending and is simply returned to the caller
@RequestMapping("/direct")
public Object sendEmail(String msg) {
    try {
        rabbitTemplate.convertAndSend("exchange.direct.springboot.email"."queue.email.routing.key", msg);
        return msg;
    } catch (AmqpException e) {
        System.out.println("Sending exception:" + e.getMessage());
        return "The network is down. Please try again later."; }}Copy the code
  1. No switch is abnormal in the simulation
    • The error message
      • ERROR 4880 – [. 200.57.39:5672] O.S.A.R.C.C achingConnectionFactory: Channel shutdown: Channel ERROR; protocol method: #method

        (reply-code=404, reply-text=NOT_FOUND – no exchange ‘noExchange’ in vhost ‘/’, class-id=60, method-id=40)
    • Error: If there is no switch, no error will be reported. Only a log will be output
    • Compensation scheme: A send callback is required to confirm whether the message was successfully sent
  2. No route exception is simulated
    • Error message: No message is displayed, the message is discarded
    • Compensation scheme: A send callback is required to confirm whether the message was successfully sent

1.2 Message sending callback

  1. Because the message is sent asynchronously, you need to make sure that the message is sent correctly
  2. So you can configure RabbitTemplate and specify callback information
  3. Step 01: Modify the configuration file and set callback parameters
    • publisher-confirm-type
      • org.springframework.boot.autoconfigure.amqp.RabbitProperties#publisherConfirmType
      • org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType
spring:
  rabbitmq:
    host: 127.0. 01.
    port: 5672
    username: tianxin
    password: tianxin
    # Enable message sending broker callback
    publisher-confirm-type: correlated
    # Enable routing message callback
    publisher-returns: true
    # Force validation, which can also be enabled in code
    template:
      mandatory: true
Copy the code
/** * The type of publisher confirms to use. */
public enum ConfirmType {

	/**
	 * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
	 * within scoped operations.
	 */
	SIMPLE,

	/**
	 * Use with {@code CorrelationData} to correlate confirmations with sent
	 * messsages.
	 */
	CORRELATED,

	/** * Publisher confirms are disabled (default). */
	NONE
}
Copy the code
  1. Step 02: Configure RabbitTemplate, switch acknowledgement callbacks, and route callbacks
    • SetConfirmCallback: called regardless of success
    • SetReturnCallback: called in case of an error
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Objects;

@Configuration
public class CustomRabbitTemplate {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        Mandatory is set to true to trigger the callback method, regardless of the result of the message push
        rabbitTemplate.setMandatory(true);
        // Set connection factory information
        rabbitTemplate.setConnectionFactory(connectionFactory);

        // Message sender Broker callback: whether the exchange from the sender to the broker was found correctly
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("SetConfirmCallback message data:" + correlationData);
            if (Objects.nonNull(correlationData)) {
                System.out.println("SetConfirmCallback message data:" + correlationData.getReturnedMessage());
            }
            System.out.println("SetConfirmCallback message confirmation:" + ack);
            System.out.println("SetConfirmCallback cause:" + cause);
            System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        });

        // Message routing callback: whether the route from the switch to the queue is sent correctly
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("SetReturnCallback message:" + message);
            System.out.println("SetReturnCallback response code:" + replyCode);
            System.out.println("SetReturnCallback response message:" + replyText);
            System.out.println("SetReturnCallback exchange:" + exchange);
            System.out.println("SetReturnCallback Routing key:" + routingKey);
            System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        });

        returnrabbitTemplate; }}Copy the code
  • Route callbacks and message callbacks
/** * A callback for publisher confirmations. * */
@FunctionalInterface
public interface ConfirmCallback {

	/**
	 * Confirmation callback.
	 * @param correlationData correlation data for the callback.
	 * @param ack true for ack, false for nack
	 * @param cause An optional cause, for nack, when available, otherwise null.
	 */
	void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}

/** * A callback for returned messages. * */
@FunctionalInterface
public interface ReturnCallback {

	/**
	 * Returned message callback.
	 * @param message the returned message.
	 * @param replyCode the reply code.
	 * @param replyText the reply text.
	 * @param exchange the exchange.
	 * @param routingKey the routing key.
	 */
	void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
}
Copy the code
  1. Step 03: Test the controller
    • convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData)
      • Specify CorrelationData
    • You can specify the message ID and callback message in CorrelationData
      • {“id”: “dataId”, data: “biz data “}
  2. Test no switch
    • http://127.0.0.1:8071/noExchange?message=direct
      • Switch not found: call setConfirmCallback directly, no further calls
@RequestMapping("/noExchange")
public Object noExchange(String message) {
    try {
        // If the connection is disconnected, the message is discarded
        String id = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend("noExchange"."springboot.email.routing.key", message, new CorrelationData(id));
        return "ok";
    } catch (AmqpException e) {
        System.out.println(e.getMessage());
        returne.getMessage(); }}Copy the code
SetConfirmCallback Message data: CorrelationData [id= 9aca9A83 -5815-455b-acf0-71b0caed534c] setConfirmCallback Message data:nullSetConfirmCallback Message confirmation:falseSetConfirmCallback Cause: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/'.class-id=60, method-id=40)
Copy the code
  1. Test no route
    • http://127.0.0.1:8071/noQueue?message=direct
      • No route found: first call setReturnCallback and then setConfirmCallback

@RequestMapping("/noQueue")
public Object noQueue(String message) {
    try {
        // If the queue cannot be sent, the message is discarded
        String id = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend("exchange.direct.springboot.email"."noQueue", message, new CorrelationData(id));
        return "ok";
    } catch (AmqpException e) {
        System.out.println(e.getMessage());
        returne.getMessage(); }}Copy the code
SetReturnCallback message: (Body:'direct' MessageProperties [headers={spring_returned_message_correlation=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) setReturnCallback312SetReturnCallback response information: NO_ROUTE setReturnCallback exchanger: the exchange. Direct. Springboot. Email setReturnCallback routing key: NoQueue -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- setConfirmCallback message data: CorrelationData [id = bl3 a4b6e77c - 4 -48e4SetConfirmCallback Message data: (Body:'direct' MessageProperties [headers={spring_listener_return_correlation=42813c45-b804-4303-b9f0-10a73dad71ca, spring_returned_message_correlation=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct.springboot.email, receivedRoutingKey=noQueue, deliveryTag=0]) setConfirmCallback Message confirmation:trueSetConfirmCallback reasons:null
Copy the code
  1. The test message is sent properly
    • http://127.0.0.1:8071/direct/confirm?message=direct
      • Message sent successfully: only setConfirmCallback is called
@RequestMapping("/direct/confirm")
public Object directConfirm(String message) {
    try {
        String id = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend("exchange.direct.springboot.email"."springboot.email.routing.key", message, new CorrelationData(id));
        return "ok";
    } catch (AmqpException e) {
        System.out.println(e.getMessage());
        return "Network interruption, please try again later ~"; }}Copy the code
SetConfirmCallback Message data: CorrelationData [id=9bb8a203-2345-4a7e-8BFD-8ad0226DA4DC] setConfirmCallback Message data:nullSetConfirmCallback Message confirmation:trueSetConfirmCallback reasons:null
Copy the code
  1. Specifies the id and message data of the callback message
    • http://127.0.0.1:8071/correlationData/message?msg=direct
      • You can specify more business types
    • org.springframework.amqp.core.Message
    • org.springframework.amqp.core.MessageProperties
@RequestMapping("/correlationData/message")
public Object correlationDataMessage(String msg) {
    try {
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(id);

        // Specify more information about the callback
        MessageProperties properties = new MessageProperties();
        properties.setMessageId(id);
        Message message = new Message(msg.getBytes(), properties);
        correlationData.setReturnedMessage(message);
        rabbitTemplate.convertAndSend("exchange.direct.springboot.email"."springboot.email.routing.key", msg, correlationData);
        return msg;
    } catch (AmqpException e) {
        System.out.println(e.getMessage());
        return "Network interruption, please try again later ~"; }}Copy the code
SetConfirmCallback Message data: CorrelationData [id= 9f598758-4b0b-4e4A-981A-e7e04EAB1335]'[B@1465d3ea(byte[6])' MessageProperties [headers={}, messageId=9f598758-4b0b-4e4a-981a-e7e04eab1335, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0]) setConfirmCallback Message confirmation:trueSetConfirmCallback reasons:null
Copy the code

Message persistence

2.1 Persistence Description

  1. Messages may be lost due to accidents when they are sent, transmitted or saved over the network
  2. For example, if one of the queues, exchanges, or messages is not persistent, the messages will be lost after the broker restarts
  3. Therefore, it is necessary to save the message before sending, and then distinguish the different message types according to the status, which can be used for retrying, etc

2.2 Persistent tables

  1. Persistence requires the creation of a table structure to store messages
create table msg_log
(
    id            bigint primary key comment 'Message unique Identifier',
    msg           text                    null comment 'Message body, JSON format',
    exchange      varchar(255) default ' ' null comment 'Switch',
    routing_key   varchar(255) default ' ' null comment 'Routing key',
    status        int          default - 1  null comment 'Status: -1 Created 0 Delivered 1 Delivered successful 2 Delivered failed 3 Consumed 4 Manual processed 5 Consumed failed',
    try_count     int          default 0  null comment 'Retry times',
    next_try_time datetime                null comment 'Next retry time',
    origin_id     varchar(32)            null comment 'the original id',
    note          varchar(500)            null comment 'Error message',
    create_time   datetime                null comment 'Creation time',
    update_time   datetime                null comment 'Update time',
) comment 'Message Posting Log';
Copy the code

2.3 Persistence implementation

  1. Use MybatisPlus to generate corresponding service, mapper, domain information, standard MyBatis use mode
  2. First, you need to configure the rabbitTemplate callback information
import com.alibaba.fastjson.JSONObject;
import com.codecoord.domain.MsgLog;
import com.codecoord.domain.MsgLogStatus;
import com.codecoord.serivce.MsgLogService;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;

@Configuration
public class CustomRabbitTemplate {
    @Resource
    private MsgLogService msgLogService;

    @Bean
    public RabbitTemplate callbackRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        // Need to set the connection project
        template.setConnectionFactory(connectionFactory);
        // Set mandatory
        template.setMandatory(true);
        // setConfirmCallback: triggers a callback when a message is sent to the Broker (whether it arrived in the Exchange correctly)
        // You need to enable publisher-confirmation-type: correlated configuration in your configuration file
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (Objects.nonNull(correlationData) && Objects.nonNull(correlationData.getId())) {
                MsgLog updateLog = new MsgLog();
                updateLog.setId(Long.parseLong(correlationData.getId()));
                updateLog.setUpdateTime(LocalDateTime.now());
                if (ack) {
                    updateLog.setStatus(MsgLogStatus.DELIVERY_SUCCESS);
                } else {
                    updateLog.setStatus(MsgLogStatus.DELIVERY_FAIL);
                }
                msgLogService.updateById(updateLog);
            } else {
                System.out.println("Message exception handling");
            }
            // Determine whether the delivery was successful based on the ACK
            System.out.println("SetConfirmCallback message data:" + JSONObject.toJSONString(correlationData));
            System.out.println("SetConfirmCallback message confirmation:" + ack);
            System.out.println("SetConfirmCallback cause:" + cause);
            System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        });

        // setReturnCallback: the callback is triggered when the start message fails to return, such as when the route is not queued
        // Enable the publisher-returns: true configuration in the configuration file
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // The message cannot be delivered to the queue. The new message is manually processed because the original message will be set as successfully delivered in setConfirmCallback
            MsgLog msgLog = new MsgLog();
            msgLog.setMsg(message.toString());
            msgLog.setExchange(exchange);
            msgLog.setRoutingKey(routingKey);
            msgLog.setStatus(MsgLogStatus.MANUAL_HANDLING);
            msgLog.setTryCount(0);
            LocalDateTime currentTime = LocalDateTime.now();
            msgLog.setNote(replyText);
            msgLog.setCreateTime(currentTime);
            msgLog.setUpdateTime(currentTime);
            // Process the original ID
            MsgLog originLog = JSONObject.parseObject(new String(message.getBody()), MsgLog.class);
            msgLog.setOriginId(originLog.getId().toString());
            msgLogService.save(msgLog);

            System.out.println("SetReturnCallback message:" + message);
            System.out.println("SetReturnCallback response code:" + replyCode);
            System.out.println("SetReturnCallback response message:" + replyText);
            System.out.println("SetReturnCallback exchange:" + exchange);
            System.out.println("SetReturnCallback Routing key:" + routingKey);
            System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
        });

        returntemplate; }}Copy the code
  1. Save messages to a disk before sending them. The rabbitTemplate used here is the newly configured rabbitTemplate
    1. http://localhost:8071/reliable?message=direct&exchange=noExchange&routingKey=reliable.routing.key
    2. http://localhost:8071/reliable?message=direct&exchange=exchange.direct.reliable&routingKey=noQueue
    3. http://localhost:8071/reliable?message=direct&exchange=exchange.direct.reliable&routingKey=reliable.routing.key
import com.alibaba.fastjson.JSONObject;
import com.codecoord.domain.MsgLog;
import com.codecoord.domain.MsgLogStatus;
import com.codecoord.serivce.MsgLogService;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.IdGenerator;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;

@RestController
public class RabbitReliableController {
    @Resource
    private RabbitTemplate callbackRabbitTemplate;
    @Resource
    private MsgLogService msgLogService;
    @Resource
    private IdGenerator idGenerator;

    @RequestMapping("/reliable")
    public Object direct(String exchange, String routingKey, String message) {
        try {
            // Save the disk first and then send it. If the save fails, there is no need to continue sending
            MsgLog msgLog = saveMessageLog(exchange, routingKey, message);
            CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
            callbackRabbitTemplate.convertAndSend(exchange, routingKey, JSONObject.toJSONString(msgLog), correlationData);
            return msgLog;
        } catch (AmqpException e) {
            System.out.println(e.getMessage());
            return "Network interruption, please try again later ~"; }}private MsgLog saveMessageLog(String exchange, String routingKey, String msg) {
        MsgLog msgLog = new MsgLog();
        // Test, use id generator in production
        msgLog.setId(System.currentTimeMillis());
        msgLog.setMsg(msg);
        msgLog.setStatus(MsgLogStatus.CREATE);
        msgLog.setExchange(exchange);
        msgLog.setRoutingKey(routingKey);
        msgLog.setTryCount(0);
        LocalDateTime currentTime = LocalDateTime.now();
        msgLog.setCreateTime(currentTime);
        msgLog.setUpdateTime(currentTime);
        msgLogService.save(msgLog);
        returnmsgLog; }}Copy the code

2.3 Compensation Mechanism

  1. A compensation mechanism can be used to resend saved messages
  2. Use the scheduled task provided by SpringBoot
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.codecoord.domain.MsgLog;
import com.codecoord.serivce.MsgLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;

@Component
@EnableScheduling
public class RabbitMqJob {
    @Resource
    private MsgLogService msgLogService;
    @Resource
    private RabbitTemplate callbackRabbitTemplate;

    @Scheduled(cron = "10/10 * * * * ?" )
    public void msgResend(a) {
        // Each message can be retried up to three times
        LambdaQueryWrapper<MsgLog> retryMsg = Wrappers.<MsgLog>lambdaQuery()
                .eq(MsgLog::getStatus, -1)
                .lt(MsgLog::getTryCount, 3);
        List<MsgLog> msgLogList = msgLogService.list(retryMsg);
        for (MsgLog msgLog : msgLogList) {
            msgLog.setTryCount(msgLog.getTryCount() + 1);
            msgLog.setUpdateTime(LocalDateTime.now());
            LambdaUpdateWrapper<MsgLog> updateWrapper = Wrappers.<MsgLog>lambdaUpdate()
                    .eq(MsgLog::getId, msgLog.getId());
            boolean update = msgLogService.update(msgLog, updateWrapper);
            System.out.println("Retry status update:" + update);
            callbackRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),
                    newCorrelationData(msgLog.getId().toString())); }}}Copy the code

2.5 Message Testing

  1. The message will look like this after being saved