@[TOC] Microservices can be designed as message-driven microservices, and responsive systems can also be based on message-oriented middleware, which is really important in Internet application development from this perspective.

Today, using RabbitMQ as an example, Songo is here to talk about reliability in the middle of a message.

Note that I’m mainly talking about how to make sure the message producer sends the message, not about message consumption.

1. Sending RabbitMQ messages

Messaging in RabbitMQ introduces the concept of an Exchange, where messages are routed to different queues and consumed by different consumers according to established routing rules.

This is the general process, so to ensure the reliability of the message sent, mainly from two aspects to confirm:

  1. The message reached the Exchange successfully
  2. The message reached the Queue successfully

If we can confirm these two steps, then we can consider the message sent successfully.

If either of these steps fails, the message does not arrive, and we may have to resend the message through retries, etc. After many retries, if the message still does not arrive, human intervention may be required.

From the above analysis, we can confirm that there are only three things we need to do to make sure the message is sent successfully:

  1. Confirm that the message arrived at the Exchange.
  2. Confirm that the message arrived in the Queue.
  3. Enable a scheduled task to deliver the messages that fail to be sent.

2. RabbitMQ efforts

The third of the above three steps needs to be implemented by ourselves, and the first two steps have solutions for RabbitMQ.

How do I ensure that the message reaches RabbitMQ successfully? RabbitMQ provides two solutions:

  1. Enabling transaction mechanism
  2. Sender acknowledgement mechanism

These are two different options. You can only select one of them. If both options are enabled, the following error will be reported:

Let’s look at them separately. All of the following cases are developed in Spring Boot, and the relevant source code can be downloaded at the end of the article.

2.1 Enabling the Transaction Mechanism

The RabbitMQ transaction mechanism can be enabled in Spring Boot as follows:

First, you need to provide a transaction manager, as follows:

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}
Copy the code

Next, do two things on the message producer: add transaction annotations and set the communication channel to transactional mode:

@Service
public class MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Transactional
    public void send(a) {
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
        int i = 1 / 0; }}Copy the code

Two points to note here:

  1. To send a message@TransactionalAnnotations mark transactions.
  2. To enable transaction mode, call the setChannelTransacted method set to true.

That’s OK.

In the case above, we had a 1/0 at the end, which must throw an exception at runtime, and we could try to run the method and find that the message was not sent successfully.

When we turn on transaction mode, there are four additional steps for RabbitMQ producers to send messages:

  1. The client makes a request to set the channel to transactional mode.
  2. The server replies, agreeing to set the channel to transactional mode.
  3. The client sends the message.
  4. The client commits the transaction.
  5. The server responds, confirming that the transaction is committed.

The above steps, except for the third step is already there, the other steps are gratuitous. So as you can see, the transaction model is actually a little bit inefficient, and it’s not the best solution. We can think about, what projects will use messaging middleware? This is typically a highly concurrent project where concurrency performance is particularly important.

RabbitMQ also provides publisher confirmation to ensure that messages are sent successfully, which is much better than transactional mode.

2.2 Sender acknowledgement Mechanism

2.2.1 Single message processing

First we remove the code just about transactions, and then we configure the application. Properties to enable the sender acknowledgement mechanism, as follows:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
Copy the code

The first line is the acknowledgment callback that configures the message to arrive at the exchange, and the second line is the callback that configures the message to arrive at the queue.

The first property configuration has three values:

  1. None: disables the publication confirmation mode. This mode is used by default.
  2. Correlated: indicates a callback method that is triggered after a successful release to the exchange.
  3. Simple: Similar to correlated, and supportswaitForConfirms()waitForConfirmsOrDie()Method.

Next, we need to enable two monitors. The configuration is as follows:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnsCallback {
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    Queue queue(a) {
        return new Queue(JAVABOY_QUEUE_NAME);
    }
    @Bean
    DirectExchange directExchange(a) {
        return new DirectExchange(JAVABOY_EXCHANGE_NAME);
    }
    @Bean
    Binding binding(a) {
        return BindingBuilder.bind(queue())
                .to(directExchange())
                .with(JAVABOY_QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate(a) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("{}: message reached the exchange successfully",correlationData.getId());
        }else{
            logger.error("{}: message sent failed", correlationData.getId()); }}@Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("{}: Message not successfully routed to queue",returned.getMessage().getMessageProperties().getMessageId()); }}Copy the code

Here’s what I’ll say about this configuration class:

  1. Define configuration class, implementRabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnsCallbackTwo interfaces, of which the former is called back to determine the message’s arrival at the exchange, and the latter is called when the message fails to be routed to the queue.
  2. Define the initRabbitTemplate method and add the @postConstruct annotation to configure the two callbacks separately for rabbitTemplate.

That’s it.

Next, let’s test message sending.

First we try to send a message to a non-existent switch like this:

rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
Copy the code

Note that the first argument is a string, not a variable. The switch does not exist.

Next we give a real exchange, but a non-existent queue, as follows:

rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME"."hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
Copy the code

Notice that the second argument is a string, not a variable.

As you can see, the message successfully reached the switch, but was not successfully routed to the queue (because the queue does not exist).

This is the sending of a message, let’s look at the sending of a message in bulk.

2.2.2 Batch Message processing

If the message is batch processed, the callback listen for successful sending is the same, which is not covered here.

This is the publisher-confirm model.

Message throughput is greatly improved in this mode compared to transactions.

3. If no, try again

There are two types of failed retries, either when MQ is not found at all, or when MQ is found but the message fails to be sent.

Let’s look at the two retries separately.

3.1 Built-in retry mechanism

The transaction mechanism and the sender acknowledgement mechanism mentioned above are both ways for the sender to acknowledge the success of the message. Spring Boot also has a retry mechanism if the sender is not connected to MQ in the first place, but this retry mechanism is not related to MQ itself. It uses the retry mechanism in Spring, which is configured as follows:

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2
Copy the code

The meanings of configuration from top to bottom are as follows:

  • Enable the retry mechanism.
  • Retry start interval.
  • Maximum number of retries.
  • Maximum retry interval.
  • Interval time multiplier. (The interval multiplier is set to 2, so the first interval is 1 second, the second interval is 2 seconds, the third interval is 4 seconds, and so on.)

When the configuration is complete, start the Spring Boot project again, and then turn off MQ, an attempt to send a message will fail, resulting in an automatic retry.

3.2 Service Retry

Service retries are mainly for messages that do not reach the exchange.

If the message does not make it to the exchange, as described in section 2, a message sending failure callback will be triggered. In this callback, we can do something!

The whole idea is this:

  1. First create a table to record the messages sent to the middleware, like this:

Each time a message is sent, a record is added to the database. The fields here are easy to understand, but there are three I’ll add:

  • Status: indicates the status of the message. 0, 1, and 2 indicate that the message is being sent, the message is sent successfully, and the message is sent failed.
  • TryTime: indicates the first retry time of the message (after the message is sent, tryTime is not displayed at this point in time, then you can start to retry).
  • Count: indicates the number of message retries.

The other fields are easy to understand, so I won’t go over them.

  1. When the message is sent, we will save a message sending record to the table, and set the status to 0 and tryTime to 1 minute later.
  2. In the Confirm callback method, if a callback is received that the message was sent successfully, the status of the message is set to 1 (the msgId is set for the message when it is sent, and the msgId is used to uniquely lock the message when it is sent successfully).
  3. In addition, start a scheduled task. The scheduled task will retrieve messages from the database every 10 seconds. It will retrieve messages whose status is 0 and whose tryTime time has passed. Then change the status of the message to 2, indicating that the message fails to be sent and the message is not retried. If there are no more than three retries, the message is resend with the value of count +1.

The general idea is the above, songge here do not give the code, songge VHR email is such ideas to deal with, the complete code you can refer to the VHR project (github.com/lenve/vhr).

There are, of course, two drawbacks to this approach:

  1. A trip to the database may slow down MQ Qos, but sometimes we don’t need MQ to have high Qos, so it depends on the application.
  2. Along the same lines, it is possible to send the same message repeatedly, but this is not a problem, we should solve the idempotentiality problem when the message consumption.

Of course, you should also pay attention to whether the message to ensure 100% success, also depends on the specific situation.

4. Summary

Ok, so that’s some of the common problems with message producers and the solutions to them. In the next article, Sonko will discuss how to ensure successful message consumption and solve idempotent problems.

The source code for this article can be downloaded here: github.com/lenve/javab…