1. Introduction

As you know, the system development process uses messaging middleware for asynchronous message processing, decoupling between systems, and peaking of system traffic. When using message-oriented middleware, we need to understand the following scenarios:

  • How does it integrate with our development framework, SpringBoot
  • How to send messages
  • How do I send complex messages
  • How to ensure the reliability of sending messages
  • How to consume messages
  • How to ensure the reliability of consumption messages
  • How do you ensure consumer scalability
  • How to use consumers for peak traffic shaving

Start this article with these scenarios, using RabbitMQ as an example for messaging middleware

2. Integrate with SpringBoot

2.1 Adding a Dependency

It is relatively easy to integrate RabbitMQ through the SpringBoot framework by adding the corresponding starter to the POM file

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

2.2 Adding the MQ Service Configuration

spring:

  rabbitmq:

    host: localhost

    port: 5672

    username: guest

    password: guest

    virtual-host: boot-example

Copy the code

2.3 Injecting message templates

@Autowired

private RabbitTemplate rabbitTemplate;

Copy the code

2.4 Sending Messages

public void sendMessage(a) {

    rabbitTemplate.convertAndSend("test"."test"."mq produce send a message");

}

Copy the code

2.5 Consumption news

@Component

@Slf4j

public class MqConsumer {

    

    @RabbitListener(id = "consumerMessage1", queues = "test")

    public void consumeMessage1(Message message, Channel channel, String content) {

        log.info("The receive message1: {}", content);     

    }

Copy the code

3. How do I send complex messages

In the process of system development, complex object interactions with other systems are often carried out in the form of JSON, which requires us to set the serialization and deserialization converters of messages

3.1 The producer sets the message converter

@Bean

public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {

    RabbitTemplate rabbitTemplate = new RabbitTemplate();

    configurer.configure(rabbitTemplate, connectionFactory);

    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

    return rabbitTemplate;

}

Copy the code

3.2 The consumer sets up the message converter

@Bean

publicRabbitListenerContainerFactory<? > rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory);

    factory.setMessageConverter(new Jackson2JsonMessageConverter());

    return factory;

}

Copy the code

3.3 Consumers specify listening container factories

@RabbitListener(queues = "test3", containerFactory = "rabbitListenerContainerFactory")

public void consumeComplexMessage(Order order) {

    log.info("Receive complex message: {}", order);

}

Copy the code

4. Reliability of sending messages

4.1 Why Is the Reliability of sending Messages Guaranteed

Regarding the reliability of sending messages, let’s take a look at the official documentation:

A RabbitMQ node can lose persistent messages if it fails before said messages are written to disk. For instance, consider this scenario:

  1. a client publishes a persistent message to a durable queue
  2. a client consumes the message from the queue (noting that the message is persistent and the queue durable), but confirms are not active,
  3. the broker node fails and is restarted, and
  4. the client reconnects and starts consuming messages

At this point, the client could reasonably assume that the message will be delivered again. This is not the case: the restart has caused the broker to lose the message. In order to guarantee persistence, a client should use confirms. If the publisher’s channel had been in confirm mode, the publisher would not have received an ack for the lost message (since the message hadn’t been written to disk yet).

The RabbitMQ message server may lose persistent messages due to downtime before writing them to disk. If the producer channcel sets acknowledgement mode, messages are guaranteed not to be lost, because only messages written to disk will receive an ACK notification.

4.2 How to Ensure the Reliability of Sending Messages

4.2.1 Adding the Configuration

rabbitmq:

    publisher-confirm-type: correlated

    publisher-returns: true

Copy the code

4.2.2 Specifying the callback function

Implement RabbitTemplate. ConfirmCallback, RabbitTemplate. ReturnCallback interface

@Configuration

@Slf4j

public class MqConfig implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback {



    / * *

* Basic.ack returned by the message server

     *

     * @paramCorrelationData Indicates the associated data object

     * @param ack ack

     * @paramCause Abnormal information

* /


    @Override

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        log.info("Receive ack Confirm: {} from broker server", ack);

    }



    / * *

* Basic.return returned by the message server

     *

     * @paramMessage message object

     * @paramReplyCode response code

     * @paramReplyText Indicates the response text

     * @paramExchange switches

     * @paramRoutingKey routing key

* /


    @Override

    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        log.error("Receive return message: {} from broker server, reply code: {}, reply text: {}," +

                "Exchange: {}, routing key: {}", message.toString(), replyCode, replyText, exchange, routingKey);

    }



    @Bean

    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {

        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        configurer.configure(rabbitTemplate, connectionFactory);

        rabbitTemplate.setReturnCallback(this);

        rabbitTemplate.setConfirmCallback(this);

        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        return rabbitTemplate;

    }

}

Copy the code

Holdings returnedMessage and confirm

The comfirm() callback can be used to provide reliability for sending messages, whereas the returnedMessage() callback seems a bit redundant. It can be used under certain conditions.

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won’t route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack

Exchange routes messages to corresponding queues according to relevant rules. For messages that Cannot be routed by Exchange, the message server returns basic.return first and then basic.ack. The returnedMessage() function is called back, followed by the Confirm () function. The message server normally returns basic.ack after persisting the message and the ACK parameter of the comfirm() callback is true. However, if exchange cannot route the message, the ACK in confirm() callback parameter also returns true. In this case, the reliability of the message cannot be guaranteed only by relying on the comfirm() callback function, and the returnedMessage() callback function is needed. The Confirm () callback ensures the reliability of sending messages if you can ensure that the exchange and queue can be routed successfully.

5. Reliability of messages

5.1 Why to ensure the reliability of consumption messages

For the reliability of consumption messages, let’s take a look at the official documentation:

When a node delivers a message to a consumer, it has to decide whether the message should be considered handled (or at least received) by the consumer. Since multiple things (client connections, consumer apps, and so on) can fail, this decision is a data safety concern. Messaging protocols usually provide a confirmation mechanism that allows consumers to acknowledge deliveries to the node they are connected to. Whether the mechanism is used is decided at the time consumer subscribes.

Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit (“manual”) client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:

By default, the message server deletes messages immediately after sending them to consumers. In this mode, messages may be lost due to special scenarios such as connection disconnection, channel disconnection, and consumer exception. Therefore, message reliability cannot be guaranteed. Want to guarantee the reliability of the news consumption, need to introduce a confirmation mechanism, namely after the message server in distributed message don’t immediately delete the message, only after receive consumer ack delete messages, if consumers with the message server disconnected, the message server needs to send the message to send to other consumer spending.

5.2 How to Ensure the reliability of consumption messages

Examples of official documentation:

boolean autoAck = false;

// Set autoAck to false

channel.basicConsume(queueName, autoAck, "a-consumer-tag".

     new DefaultConsumer(channel) {

         @Override

         public void handleDelivery(String consumerTag,

                                    Envelope envelope,

                                    AMQP.BasicProperties properties,

                                    byte[] body)


             throws IOException

         
{

             long deliveryTag = envelope.getDeliveryTag();

             // negatively acknowledge, the message will

             // be discarded

             channel.basicReject(deliveryTag, false);

         }

     });

Copy the code

The key part is that the second argument to channel.basicConsume() needs to be set to false

SpringBoot integration:


The default value is automatic mode, which can meet the reliability of the message, about ack mode enumeration value explanation


There is no need to worry about consuming message reliability if you use SpringBoot

6. How to ensure customer scalability

When there are multiple consumers in a queue, the message server will push the message to consumers in the way of polling. Based on this, when the message server backlogs messages, it can increase the consumption capacity by adding machines, and consumers naturally have the ability of horizontal expansion.

7. How to use customers for peak traffic cutting

Under normal circumstances, the message server will send messages to consumers. If the distribution process is unlimited, it will increase the load of the consumer system. In serious cases, it will lead to the consumer system can not provide services to the outside. The answer is to set QoS

Because messages are sent (pushed) to clients asynchronously, there is usually more than one message “in flight” on a channel at any given moment. In addition, manual acknowledgements from clients are also inherently asynchronous in nature. So there’s a sliding window of delivery tags that are unacknowledged. Developers would often prefer to cap the size of this window to avoid the unbounded buffer problem on the consumer end. This is done by setting a “prefetch count” value using the basic.qos method. The value defines the max number of unacknowledged deliveries that are permitted on a channel. Once the number reaches the configured count, RabbitMQ will stop delivering more messages on the channel unless at least one of the outstanding ones is acknowledged.

When the number of unacknowledged messages held by a consumer exceeds the threshold, the message server will no longer send messages to the consumer, thus achieving peak traffic clipping

8. References

  • Consumer Acknowledgements and Publisher Confirms