The interview questions

How to solve message queue delay and expiration problem? What happens when the message queue is full? There are millions of messages waiting for hours. How do you fix them?

Interviewer psychoanalysis

You see this question method, in fact, the essence of the scene is to say that you may have a problem with the consumption end, do not consume; Or the pace of consumption is extremely slow. Then, what if your message queue cluster’s disk is almost full and no one is consuming it? Or maybe the whole thing just backlogs for a few hours. What do you do at this point? Or if your backlog is so long that, for example, RabbitMQ sets an expiration date for messages and then they run out?

So this thing, in fact, very common online, generally not out, a big case. For example, when a client writes to mysql after a transaction, mysql hangs and the client hangs. Or something went wrong on the consumption side, causing consumption to be extremely slow.

Analysis of interview questions

Let’s go through this one by one. Let’s assume a scenario where we have a failure on the consumption side, and a large number of messages are accumulated in MQ, and now there is an accident, panic.

A large number of messages have been stuck in MQ for hours

Tens of millions of pieces of data sit in MQ for seven or eight hours, from late 4pm to late 11pm. This is a real scenario we have encountered. It is true that there is a fault on the line. At this time, it is either to fix the problem of the consumer, restore the consumption speed, and then wait for a few hours to complete the consumption. You can’t say that in the interview.

One consumer per second is 1000, three consumers per second is 3000, and one minute is 180,000 pieces. So if you have a backlog of millions to tens of millions of data, even if the consumer recovers, it takes about an hour to recover.

At this time, only temporary emergency capacity expansion can be performed. The specific procedure and thinking are as follows:

  • Fix the problem with the consumer and make sure it gets back up to speed, then stop all existing consumers.
  • Create a new topic with 10 times as many partitions and temporarily create 10 times as many queues.
  • Then write a temporary consumer program that distributes data. This program is deployed to consume the backlog of data. After consumption, it does not do time-consuming processing, but directly polls and writes the 10 times as many queues as the temporary ones.
  • Then 10 times as many machines are temporarily enlisted to deploy the consumers, with each batch consuming a temporary queue of data. This is equivalent to temporarily expanding the Queue and consumer resources by 10 times, consuming data at 10 times the normal rate.
  • Once the backlog of data is quickly consumed, the deployed architecture needs to be restored to consume messages with the original consumer machine.

Messages in MQ are out of date

Assuming you are using RabbitMQ, RabbtiMQ can be set to expire, i.e. TTL. If messages are stuck in a queue for more than a certain amount of time they are cleared by RabbitMQ and the data is gone. So that’s the second hole. This does not mean that the data will accumulate in MQ, but rather that the data will simply get lost.

In this case, it’s not about adding to the consumer backlog, because there isn’t any backlog, it’s about losing a lot of messages. We can adopt a solution, that is, batch reguide, which we have done similar scenes online before. You know, when there’s a huge backlog, we just throw it away, and then after the peak period, like when people are drinking coffee and staying up until 12 o ‘clock at night, people are asleep. At this time we began to write procedures, will lose that batch of data, write a temporary program, bit by bit to find out, and then re-into mq inside, lost data to him during the day to make up for it. That’s all it has to be.

Suppose 10,000 orders are unprocessed in MQ, and 1000 of them are lost. You have to manually write a program to find those 1000 orders and manually send them to MQ to be filled again.

Mq is almost full

What if messages are backlogged in MQ and you do not clear them for a long time, causing MQ to fill up? Is there another way to do this? No, your first plan was too slow. You wrote an AD hoc program to access data to consume, consume one by one, discard one by one, quickly consume all messages. Then go to plan two and make up the data later in the evening.

For RocketMQ, officials provide a solution to the message backlog problem.

1. Improve the parallelism of consumption

Most of the news consumption behavior belongs to IO intensive, that may be a database operation, or call the RPC, this kind of consumption behavior of the consumption rate is the backend database or outside the system throughput, parallelism by increasing consumption, can improve the overall throughput, but parallelism increases to a certain degree, it will fall. Therefore, the application must have a reasonable degree of parallelism. There are several ways to modify the parallelism of consumption:

Under the same ConsumerGroup, the parallelism is increased by increasing the number of Consumer instances (note that Consumer instances exceeding the number of subscription queues are invalid). This can be done by adding machines or by starting multiple processes on existing machines. Increase the concurrent thread of consumption for a single Consumer by modifying parameters consumeThreadMin and consumeThreadMax.

2. Batch consumption

If some business processes support batch consumption, the consumption throughput can be greatly improved. For example, the application of order deduction takes 1 s to process one order at a time, and only 2 s to process 10 orders at a time, which can greatly improve the consumption throughput. By setting the consumer consumeMessageBatchMaxSize return a parameter, the default is 1, namely consumption one message, for example, is set to N, so the number of messages every time consumption less than or equal to N.

3. Skip non-important messages

When message accumulation occurs, if the consumption speed cannot keep up with the sending speed and services do not have high requirements on data, you can choose to discard unimportant messages. For example, when the number of messages on a queue piles up to more than 100,000, you try to discard some or all of the messages so that you can quickly catch up with the sending rate. Example code is as follows:

public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { long offset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); long diff = Long.parseLong(maxOffset) - offset; If (diff > 100000) {/ / TODO message build-up is special handling return ConsumeConcurrentlyStatus. CONSUME_SUCCESS; } / / TODO normal consumption process return ConsumeConcurrentlyStatus. CONSUME_SUCCESS; }Copy the code

4. Optimize the per-message consumption process

For example, the consumption process of a message is as follows:

  • Query from DB according to message
  • Query from DB according to message
  • Complex business calculations
  • Insert into DB
  • Insert data into DB

In the process of consuming this message, there are 4 interactions with DB. If each time is calculated as 5ms, the total time is 20ms. Assuming that the business calculation takes 5ms, the total time is 25ms. That’s a 40% improvement in overall performance. Therefore, if the application is delay sensitive, DB can be deployed on SSD disks, which have smaller RT than SCSI disks.