1. Problem phenomenon

The following error occurs when using RocketMQ:

MQBrokerException: CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE] Broker busy,start flow control for a while,period in queue: 205ms, size of queue:880.

Because the project team did not make any compensation for the message sending failure, resulting in the loss of the message, it is necessary to conduct a deep discussion on this problem and solve it.

2. Problem analysis

First we run a query in RocketMQ based on the keyword: TIMEOUT_CLEAN_QUEUE to see when the above error is thrown. The full-text search is as shown below:

This method is defined in BrokerFastFailure and by name is what it is designed to be: a quick Broker side failure mechanism.

The schematic diagram for rapid Broker failure is as follows:

  • The sender sends a write request to the Broker, which first places the request in a queue (SendThreadPoolQueue) with a default capacity of 10000.

  • The Broker uses a thread pool (SendMessageExecutor) to fetch tasks from the queue and execute message write requests. This pool defaults to one thread to ensure that messages are processed sequentially.

If a single write is jitter due to memory jitter or other factors, a large backlog of requests on a single Broker that cannot be processed in a timely manner will greatly increase the time for the client to send messages. It takes 500ms or more than 1s to write a message, and there are 5000 messages in the queue. The default timeout time of the message sender is 3s. At this rate, the client will have timed out the request before it is the turn of the Broker to execute the write request. It also causes the client to send timeout.

In order to solve this problem, RocketMQ introduces a quick failure mechanism on the Broker side, which is to start a scheduled thread to check the first queued node in the queue every 10 milliseconds. If the queued node has exceeded 200ms, it will cancel all requests in the queue that have exceeded 200ms. Brokers are clustered. The next retry can be sent to other brokers. This maximizes the retry mechanism within the default 3s time. This can effectively prevent a Broker from being unable to send messages due to instantaneous pressure, thus realizing high availability of message sending.

The quick failure mechanism on the Broker side was originally introduced. After a quick failure, a retry will be initiated. Unless all brokers in the same deep cluster are busy, the message will be sent successfully. Is the TIMEOUT_CLEAN_QUEUE error and the Broker not retry?

In order to solve this mystery, source code analysis will be used to explore the truth. Next, we will take synchronous message sending as an example to reveal the key points in the message sending process.

The MQ Client message sender first sends the request to the Broker using a network channel, then receives the result of the request and calls the processSendResponse method to parse the response, as shown below:

Here, the code for RemotingSysResponseCode RemotingCommand SYSTEM_BUSY.

We know from the proccessSendResponse method that if code is SYSTEM_BUSY, the method throws an MQBrokerException that responds with code SYSTEM_BUSY, with the error description as the error message at the beginning.

Then we follow the method’s call chain to find its direct caller as sendKernelImpl for DefaultMQProducerImpl, focusing on how the method will handle MQBrokerException if the underlying method throws it.

The key codes are shown in the figure below:

As you can see, the sendKernelImpl method catches the exception first, executes the registered hook function first, that is, if the execution fails, the corresponding hook function after the message is sent will also execute, and then throws the exception up unchanged.

The sendKernelImpl method is called by the sendDefaultImpl method of DefaultMQProducerImpl. Here is a screenshot of its core implementation:

As you can see from this, one of the key points of the RocketMQ messaging high availability design is the retry mechanism, which is implemented by wrapping the sendKernelImpl method in a for loop with a try catch to ensure that the method will retry after throwing an exception. As you can see from above, if SYSTEM_BUSY throws MQBrokerException, it is found that only the above error codes will be retried, because if not, the exception will continue to be thrown and the for loop will be broken, that is, not retried.

This is a RocketMQ BUG in which SYSTEM_BUSY is omitted. I talked to the RocketMQ core and confirmed this. I will add a PR, add a line of code to it, add SYSTEM_BUSY.

At this point, the problem should be very clear.

3. Solutions

If you are searching the web for a solution to TIMEOUT_CLEAN_QUEUE, the common solution is to increase waitTimeMillsInSendQueue, which defaults to 200ms, for example to 1000s, etc. I was against it before, Because I knew that the Broker would retry, but now I know that the Broker does not retry, increasing this value is a useful relief.

However, this is not a good solution. I will submit a PR to the official in the near future to fix this problem. I suggest that you try to modify the version you use in the company and pack a new package, because this has gone against the design intention of the rapid failure of the Broker.

However, on the business side of message sending, try to implement the message retry mechanism, that is, do not rely on the retry mechanism provided by RocketMQ itself. Because of constraints and network factors, message sending cannot be 100% successful. It is recommended that you catch the exception when sending the message, and if the message fails to be sent, you can store the message in the database. The message is retried in combination with scheduled tasks to prevent message loss to the maximum extent.

Statement for reprint articles, this paper version | is owned by the original invasion delete author: Ding Wei source: middleware in turn link: my.oschina.net/u/4052033/b…

Why retry?

If the consumer fails to process the message and does not retry, and then sends a reply to RabbitMQ, RabbitMQ will delete the message from the queue, causing the message to be lost. So we want to retry a certain number of times when the consumer fails to process the message. For example, if you retry three times and still fail after three attempts, the message is sent to the dead-letter queue.

Process failed data according to business requirements. For example, save it to a failed file or database, or send it to exchange@normal after manual processing.

How to learn Java, recommend two white piao learning materials:

1. Books:

codeGoogler/ProgramBooks

2: Video tutorial:

SpringBoot, Spring, Mybatis, Redis, RabbitMQ, SpringCloud, high concurrency (continuous update)