1. Introduction

What do you do when a queue that has been notified to consume has a backlog of messages and is on the rise and needs urgent action? What can be done? Panic for a while, fight for a while, and then open a famous search engine search, search for the answer in the search results. This article will soon be a search result in your search engine, providing you with a solution.

2. Solutions

The following solutions are for RabbitMQ

2.1 with the machine

The simplest and most convenient solution is to let the operation and maintenance machine, so that the messages in the message queue can be distributed to more machines through the way of load, and the more consumers, the backlog of messages in the queue will be slowly processed.

2.2 Plus Consumers

Of course, adding machines is not a panacea. If your system is not up to the task of adding machines (too expensive), then you need to take a different approach and increase the number of concurrent consumption threads.

2.2.1 Fixed number of concurrent requests

The @rabbitListener annotation has the property concurrency.

/** * Set the concurrency of the listener container for this listener. Overrides the * default set by the listener container factory. Maps to the concurrency setting of * the container type. * <p>For a * {@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed * number of consumers in the {@code concurrentConsumers} property. If it is a string * with the form {@code "m-n"}, the {@code concurrentConsumers} is set to {@code m} * and the {@code maxConcurrentConsumers} is set to {@code n}. * <p>For a * {@link org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer * It sets DirectMessageListenerContainer} the {@ code consumersPerQueue} property. * @ return the concurrency. * @ since 2.0 * /Copy the code

Set the number of concurrent listeners. If the value is an int, set the number of concurrent listeners. If the attribute value is a string (m-n), then the dynamic concurrency is set, with a minimum of m and a maximum of N

Based on the documentation comments, it can be concluded that if you want to set concurrency, just add the concurrency property to the @rabbitListener annotation and specify a value

@RabbitListener(queues = "test3", containerFactory = "rabbitListenerContainerFactory", concurrency = "10")
Copy the code

2.2.2 Number of Dynamic Concurrent requests

After setting the number of concurrent consumers, the message consumption is faster and the backlog is no longer. The problem is solved perfectly. The chapter on dynamic concurrency shows that things are not so simple. The original intention of setting the number of concurrent consumers is to solve the problem of message backlog, but messages are not always backlog, and in most cases there may be no messages to consume in the queue, so setting the number of concurrent consumers will undoubtedly bring some resource consumption to the system. Fortunately, the spring framework leaders have come up with a solution: dynamic concurrency

Dynamic concurrency setting format:

@RabbitListener(queues = "test3", containerFactory = "rabbitListenerContainerFactory", concurrency = "1-10")
Copy the code

Concurrency is set to 1-10. The concurrency setting for each client is set to a minimum of 1. When there are more messages to be processed, the client concurrency increases to a maximum of 10, and when there are none, the client concurrency decreases to 1

After reading the above description, you may think I’m talking nonsense, but wait until you see the following code implementation

private void checkAdjust(boolean receivedOk) { if (receivedOk) { // 1. 24 the current consumer isActive if (isActive(this.consumer)) {this.consecutiveidles = 0; 1.1 If the number of consecutive messages exceeds the default value 10, Consider a new consumer if (this. ConsecutiveMessages++ > SimpleMessageListenerContainer. Enclosing consecutiveActiveTrigger) { considerAddingAConsumer(); this.consecutiveMessages = 0; } } } else { this.consecutiveMessages = 0; // 2. If no message is consumed for more than 10 consecutive times, Consider to terminate the current consumer if (this. ConsecutiveIdles++ > SimpleMessageListenerContainer. Enclosing consecutiveIdleTrigger) { considerStoppingAConsumer(this.consumer); this.consecutiveIdles = 0; }}}Copy the code

3. Summary

If there is a backlog of messages in the message queue, don’t panic. In fact, if you can add machines, you can add machines. If you can’t add machines, you need to increase the number of concurrent customers