The optimization of the RabbitMQ

channel

The producer, consumer, and RabbitMQ all make connections. To avoid establishing too many TCP connections, reduce resource consumption.

The AMQP protocol introduces channels. Multiple channels use the same TCP connection to reuse TCP connections.

However, the number of connections of a channel is limited. Too many connections will cause congestion of TCP reuse.

const (
	maxChannelMax = (2 << 15) - 1
	defaultChannelMax = (2 << 10) - 1
)
Copy the code

Through http://github.com/streadway/amqp, a client to connect to the RabbitMQ, it defines the maximum value and the default maximum 65535 2047.

prefetch Count

What is a prefetch Count?

Assuming the RabbitMQ queue has N consumer queues, messages in the RabbitMQ queue will be sent to consumers in a polling manner.

If the number of messages is M, then the data per consumer is M%N. If consumers on one machine consume slowly, either for their own reasons or because messages themselves take a long time to process, but messages allocated by other consumers are quickly consumed and then idle, this causes a waste of resources and reduces the throughput of message queues.

This is where the Prefetch Count comes in. The prefetch Count is introduced to prevent message queues with limited consumption capacity from allocating too many messages and consumers with better message processing capacity from having no message processing.

RabbitM keeps a list of consumers and counts them for each message sent. If the limit is reached, RabbitMQ will not send any more messages to that consumer. Until the consumer confirms a message, RabbitMQ reduces the count by one, after which the consumer can continue to receive the message until it reaches the count limit again. This mechanism is analogous to the “sliding window” of TCP/IP.

So messages are not overpopulated by slow consumers and are better distributed to other fast consumers. In layman’s terms, this is the maximum number of unconsumed messages a consumer can get from RabbitMQ.

What is the appropriate prefetch Count number to set? Finding 41 time consuming with RabbitMQ 3.3

To talk about prefetch Count, we also need to look at global, which RabbitMQ has redefined over amQpo-9-1 to improve performance

Global parameters AMQPO-9-1 RabbitMQ
false All consumers on the channel are subject to prefetchC unt limits New consumers on the channel need to comply with the prefetchCount qualifier
true All consumers on the current Connection are subject to prefetchCount, which is shared by consumers on the same Connection All consumers on the channel are subject to prefetchCunt’s upper limit, which is shared by consumers on the same channel

PrefetchSize: Indicates the maximum size (inclusive) of a single message content to be preread. It can be simply understood as the maximum size limit of the message payload byte array. 0 indicates that there is no upper limit, and the unit is B.

If the prefetch Count is 0, there is no upper limit on the number of messages that can be prefetched.

Here’s an example of a wrong chestnut:

A consumer in the previous queue was consuming too slowly and prefetch Count was 0, then a new consumer was written, prefetch Count was set to 30, and 10 pods were raised to process messages. Old consumers are processing messages even before they go offline.

But the pace of consumption is still very slow, and there is a lot of news that is unacked. If you understand what prefetch Count means, you can already guess the cause of the problem.

The old consumer prefetch Count is 0, so many unacked messages are held by it. Although several new consumers are added, all of them are idle. Finally, the consumer whose prefetch Count is 0 is stopped, and soon the consumption speed becomes normal.

Dead-letter queue

What is a dead letter queue

Generally, a message becomes dead letter if it meets one of the following conditions

  • BasicNack or channel.basicReject is used when the message is rejected, and the Requeue attribute is set to false.

  • The message is expired. The duration of the message in the queue exceeds the set TT L time.

  • The number of messages in the message queue exceeded the maximum queue length. Procedure

When a message meets the above conditions and becomes a dead message, it will be pushed to a DEAD Letter Exchange (DLX) again. A queue bound to a DLX is a dead letter queue.

So the dead letter queue is not a special queue, just bound to the dead letter switch, there is nothing special about the dead letter switch, we just use this to deal with the dead letter queue, and no essential difference from other switches.

For services that need to process the dead-letter queue, a unique routing key is defined and a corresponding dead-letter queue is configured to listen on the dead-letter switch bound to the key.

Usage scenarios

When a message consumption problem occurs, the message is not lost, and the message is saved temporarily for subsequent troubleshooting.

Code implementation

The use of dead letter queues, see below, in conjunction with delay queues to implement message retry mechanisms.

Delays in the queue

What is a delay queue

Delay queues are used to store messages for delayed consumption.

What are delayed messages?

A message that does not expect consumers to spend right away and waits a specified amount of time before making a purchase.

Usage scenarios

1. Close idle connections. In the server, there are many client connections that need to be closed after a period of idle time.

2. Clear expired data. For example, objects in the cache need to be removed from the cache when the idle time expires.

3. Task timeout processing. In the network protocol sliding window request response interaction, processing timeout request no response;

4. The order will be automatically cancelled if no payment is made within 30 minutes after placing the order.

5. Order notification: Send SMS notification to the user 60 seconds after the successful order;

6. How to close the order in time and return the inventory when the order has not been paid;

7. Check regularly whether the order in the refund state has been successfully refunded;

8. How can the system know the information and send an activation message if the newly created store has not uploaded the goods within N days?

9. Scheduled task scheduling: Use DelayQueue to store the tasks that will be executed on the current day and their execution time. Once the tasks are obtained from DelayQueue, the tasks will be executed.

This summarizes some business scenarios for delayed processing

A way to implement a delay queue

RabbitMQ does not provide delay queuing directly, but dead letter queuing and TTL are available. To realize the function of delay queue.

If a message exceeds the TTL value, it will become dead letter. Note here that the TTL is in milliseconds. There are two ways to set the expiration time

  • 1. Through the queue attribute setting, messages in the queue have the same expiration time;

  • 2, through the message itself individually set, each message has its own expiration time.

If both are set together, the TTL of the message takes the smaller value in between.

The above two TTL expiration times, message queue processing is different. First, messages are deleted from the message queue as soon as they expire. Second, messages are not deleted immediately after expiration. Deletion is determined before delivery to the consumer.

In the first way, messages with the same expiration time are in the same queue, so expired messages are always in the header, and it’s just a matter of scanning in the header. In the second way, the expiration time is different, but the message is in the same message queue. If you want to clear all the expiration time, you need to traverse all the messages. Of course, this is not reasonable, so the expiration judgment will be made when the message is consumed. This processing idea is somewhat similar to redis expired key cleanup.

Queue TTL

The x-Expires parameter in the channel.queueDeclare method controls how long a queue is unused before it is automatically deleted. Unused means there are no consumers on the queue, the queue has not been redeclared, and the basic.get command has not been called during the expiration period.

	if _, err := channel.QueueDeclare("delay.3s.test".true.false.false.false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-expires":                 3000,}); err ! =nil {
		return err
	}
Copy the code
Message TTL

There are two ways to set the Message TTL

  • Per-Queue Message TTL

By setting the x-message-TTL parameter in queue.declare, you can control the expiration time of messages in the current queue. However, if the same message is sent to multiple queues, the expiration of the message in the x-message-TTL queue will not affect the same message in other queues. The expiration of messages processed by different queues is isolated.

	if _, err := channel.QueueDeclare("delay.3s.test".true.false.false.false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-message-ttl":             3000,}); err ! =nil {
		return err
	}
Copy the code
  • Per-Message TTL

You can set the expiration time for each message by using expiration. Note that expiration is a string.

	delayQ := "delay.3s.test"
	if _, err := channel.QueueDeclare(delayQ,
		true.false.false.false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key, }, ); err ! =nil {
		return err
	}

	if err := channel.Publish("", delayQ, false.false, amqp.Publishing{
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
		Expiration:   "3000"}); err ! =nil {
		return err
	}
Copy the code

Scenarios where delayed consumption is processed through a delay queue can be handled with the aid of a dead letter queue

The usual use of deferred queues is that the consumer subscribes to the dead-letter queue, deadQueue, and the messages that need to be deferred are sent to the delayNormal. Then the TTL in delayNormal expires, and the message is stored in a dead-letter queue, deadQueue. We just need to consume the data in the dead-letter queue normally, and then we can implement the logic of delayed consumption of the data.

Use Queue TTL to set the expiration time

Here’s an example of an online message retransmission chestnut:

The consumer processes the message in the queue. In the process of processing a message, there will be errors. It is hoped that these messages can be returned to the queue for consumption after a period of time. Of course, if no Ack is done, or if Ack is then re-queued, the consumer can retry the consumption again. However, there is a problem with this. The message consumption in the consumption queue is very fast, and the message that has just been repushed immediately reaches the head of the queue. The consumer may immediately get the message again, and then continue to retry in an endless loop, affecting the consumption of other messages. That’s where the delay queue comes in, and we can use the delay queue to set a specific delay time so that the retries of these messages will occur at some later point in time. And after a certain number of retries, you can choose to discard the message.

Take a look at the flow chart:

Specific processing steps:

1. The producer pushes a message to the work-Exchange and then sends it to the work-queue.

2. Consumers subscribe to work-queue, which is normal business consumption;

3. The messages requiring delayed retry are sent to the delay queue.

The dead letter queue is bound to the exchange and routing-key of the workqueue, so that expired messages can be pushed back to the queue. Each time the delay queue is pushed, the number of times the message is pushed will be recorded. If the upper limit is reached, the data can be discarded, dropped or other operations can be performed.

5. So consumers just need to listen to process the work-queue;

6, useless delay queue, to delete the time node, will be automatically deleted.

Code, the Demo address πŸ‘πŸ»

func (b *Broker) readyConsumes(ps *params) (bool, error) {
	key := ps.key
	channel, err := b.getChannel(key)
	iferr ! =nil {
		return true, err
	}

	queue, err := b.declare(channel, key, ps)
	iferr ! =nil {
		return true, err
	}

	if err := channel.Qos(ps.prefetch, 0.false); err ! =nil {
		return true, fmt.Errorf("channel qos error: %s", err)
	}

	deliveries, err := channel.Consume(queue.Name, "".false.false.false.false.nil)
	iferr ! =nil {
		return true, fmt.Errorf("queue consume error: %s", err)
	}

	channelClose := channel.NotifyClose(make(chan *amqp.Error))

	pool := make(chan struct{}, ps.concurrency)

	go func(a) {
		for i := 0; i < ps.concurrency; i++ {
			pool <- struct{}{}
		}
	}()

	for {
		select {
		case err := <-channelClose:
			b.channels.Delete(key)
			return true, fmt.Errorf("channel close: %s", err)
		case d := <-deliveries:
			if ps.concurrency > 0 {
				<-pool
			}
			go func(a) {
				var flag HandleFLag

				switch flag = ps.Handle(d.Body); flag {
				case HandleSuccess:
					d.Ack(false)
				case HandleDrop:
					d.Nack(false.false)
					// Process messages that require delayed retry
				case HandleRequeue:
					iferr := b.retry(ps, d); err ! =nil {
						d.Nack(false.true)}else {
						d.Ack(false)}default:
					d.Nack(false.false)}if ps.concurrency > 0 {
					pool <- struct{}{}}()}}}func (b *Broker) retry(ps *params, d amqp.Delivery) error {
	channel, err := b.conn.Channel()
	iferr ! =nil {
		return err
	}
	defer channel.Close()

	retryCount, _ := d.Headers["x-retry-count"]. (int32)
	// Determine the maximum number of attempts
	if int(retryCount) >= len(ps.retryQueue) {
		return nil
	}

	delay := ps.retryQueue[retryCount]
	delayDuration := time.Duration(delay) * time.Millisecond
	delayQ := fmt.Sprintf("delay.%s.%s.%s", delayDuration.String(), b.exchange, ps.key)

	if _, err := channel.QueueDeclare(delayQ,
		true.false.false.false, amqp.Table{
			// Configure exchange and routing-key for dead-letter sending
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			// The expiration time of the message
			"x-message-ttl":             delay,
			// Delay the time when queues are automatically deleted
			"x-expires":                 delay * 2,}); err ! =nil {
		return err
	}

	// If exchange is empty, use Default exchange
	return channel.Publish("", delayQ, false.false, amqp.Publishing{
		// Sets the number of attempts
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
	})
}
Copy the code

Test the

Start a RabbitMQ with docker

$ sudo mkdir -p /usr/local/docker-rabbitmq/data

$ docker run -d --name rabbitmq37.7. -p 5672:5672 -p 15672:15672 -v /usr/local/docker-rabbitmq/data:/var/lib/rabbitmq --hostname rabbitmq -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.77.-management
Copy the code

The password is admin

const (
	DeadTestExchangeQueue = "dead-test-delayed-queue_queue"
)

func main(a) {

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)

	broker := rabbitmq.NewBroker("It: / / admin: [email protected]:5672", &rabbitmq.ExchangeConfig{
		Name: "worker-exchange",
		Type: "direct",
	})

	broker.LaunchJobs(
		rabbitmq.NewDefaultJobber(
			"dead-test-key",
			HandleMessage,
			rabbitmq.WithPrefetch(30),
			rabbitmq.WithQueue(DeadTestExchangeQueue),
			rabbitmq.WithRetry(help.FIBONACCI, help.Retry{
				Delay: "5s",
				Max:   6,
				Queue: []string{
					DeadTestExchangeQueue,
				},
			}),
		),
	)

	for {
		s := <-ch
		switch s {
		case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
			fmt.Println("job-test-exchange service exit")
			time.Sleep(time.Second)
			return
		case syscall.SIGHUP:
		default:
			return}}}func HandleMessage(data []byte) error {
	fmt.Println("receive message"."message".string(data))

	return rabbitmq.HandleRequeue
}
Copy the code

The message received is retried directly, and let’s look at delayed queue execution

Once started, take a look at the message queue panel

Push a piece of data through the control panel

You can see the message execution process in the delay queue, and the delay queue that is no longer used is automatically deleted at the set expiration point


Finally, you can see that the message has been retried several times

When we finally reach our retry limit, the message is lost

Use Message TTL to set the expiration time

Using Message TTL, there will be a time sequence problem in our queue. Here is the analysis:

With Message TTL, all messages that are set to expire are put in a queue. Because messages are queued one by one, the second message can only be processed if the first message is consumed. If the first message expires 10 seconds, the second expires 1s. Number two is definitely out of date before number one, and in theory, number two should be dealt with first. But with the restrictions discussed above, the second message cannot be processed if the first is not consumed. This creates a timing problem, which of course would not be the case with Queue TTL, since messages with the same expiration time are in the same Queue, so the messages at the head of the Queue are always the first to expire. So how can this situation be avoided?

This can be handled using the Rabbitmq-delayed -message-exchange plug-in. Rabbitmq-delayed -message-exchange plugin address

Implementation principle:

A new Exchange type, X-delayed – Message, is generated when the plug-in is installed, and the principle is delayed delivery. When a delayed message is received, instead of being sent directly to the target queue, the message is stored in the Mnesia database. See the Mnesia database for details of what is mnesia. When the delay time is up, it is pushed to the target queue via x-delayed-message. Then consume the target queue, avoiding the timing issues of expiration.

How do you use it

To demonstrate this use of a virtual machine, install RabbitMQ first, as shown in RabbitMQ 3.8.5

Then download the Rabbitmq-delayed -message-exchange plug-in

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange- 3.9.0. Ez $cp rabbitmq_delayed_message_exchange - 3.9.0. Ez/usr/lib/rabbitmq/lib/rabbitmq_server - 3.8.5 / plugins plugin $# to check the installation  rabbitmq-plugins list Listing plugins with pattern ".*" ... Configured: E = explicitly enabled; e = implicitly enabled | Status: | * = running on rabbit @ centos7-1 / [] rabbitmq_amqp1_0 3.8.5 [] rabbitmq_auth_backend_cache 3.8.5 [] Rabbitmq_auth_backend_http 3.8.5 [] rabbitmq_auth_backend_ldap 3.8.5 [] rabbitmq_auth_backend_oauth2 3.8.5 [] Rabbitmq_auth_mechanism_ssl 3.8.5 [] rabbitmq_consistent_hash_exchange 3.8.5 [E*] rabbitmq_delayed_message_exchange 3.9.0 [] rabbitmq_event_exchange 3.8.5 [] rabbitmq_federation 3.8.5 $rabbitmq-plugins enable rabbitmq_delayed_message_exchange $ systemctl restart rabbitmq-serverCopy the code

Modify the chestnut above to use x-delayed-message

Code, demo address πŸ‘πŸ»

func (b *Broker) declareDelay(key string, job Jobber) error {
	keyNew := fmt.Sprintf("delay.%s", key)

	channel, err := b.getChannel(fmt.Sprintf("delay.%s", key))
	iferr ! =nil {
		return err
	}
	defer channel.Close()

	exchangeNew := fmt.Sprintf("delay.%s", b.exchange)

	if err := channel.ExchangeDeclare(exchangeNew, "x-delayed-message".true.false.false.false.nil); err ! =nil {
		return fmt.Errorf("exchange declare error: %s", err)
	}

	queue, err := channel.QueueDeclare(fmt.Sprintf("delay.%s", job.Queue()), true.false.false.false, amqp.Table{
		"x-dead-letter-exchange":    b.exchange,
		"x-dead-letter-routing-key": key,
	})
	iferr ! =nil {
		return fmt.Errorf("queue declare error: %s", err)
	}
	if err = channel.QueueBind(queue.Name, keyNew, exchangeNew, false.nil); err ! =nil {
		return fmt.Errorf("queue bind error: %s", err)
	}
	return nil
}

func (b *Broker) retry(ps *params, d amqp.Delivery) error {
	channel, err := b.conn.Channel()
	iferr ! =nil {
		return err
	}
	defer channel.Close()

	retryCount, _ := d.Headers["x-retry-count"]. (int32)

	if int(retryCount) >= len(ps.retryQueue) {
		return nil
	}
	fmt.Println("Number of message retries", retryCount+1)

	delay := ps.retryQueue[retryCount]

	if err := channel.ExchangeDeclare(fmt.Sprintf("delay.%s", b.exchange), "x-delayed-message".true.false.false.false, amqp.Table{
		"x-delayed-type": "direct"}); err ! =nil {
		return err
	}

	return channel.Publish(fmt.Sprintf("delay.%s", b.exchange), fmt.Sprintf("delay.%s", ps.key), false.false, amqp.Publishing{
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
		Expiration:   fmt.Sprintf("%d", delay),
	})
}
Copy the code

Set the message type in the retry queue to X-delayed -message so that you can use the plug-in just below.

After pushing a message through the palette, see the results of the run

Dead-test-delayed -message_queue is the queue for normal business consumption, delay.dead-test-delayed-message_queue is the queue for delayed consumption, the message here is the message for delayed consumption, It will be pushed back to dead-test-delayed message_queue by dead-letter when it is overdue

Take a look at the console output

Use plug-ins or Queue TTL to handle delay queues?

Rabbitmq-delayed -message-exchange restrictions:

  • 1. The plug-in does not support delayed message replication. In RabbitMQ cluster mode, if one of the nodes goes down, messages will be unavailable and can only be restored after the node is restarted.

  • 2. Currently, this plug-in only supports disk nodes, not RAM nodes.

  • Not suitable for situations with a large number of delayed messages (e.g., thousands or millions of delayed messages).

This plugin is considered to be experimental yet fairly stable and potential suitable for production use as long as the user is aware of its limitations. This plugin is not commercially supported by Pivotal at the moment but it doesn’t mean that it will be abandoned or team RabbitMQ is not interested in improving it in the future. It is not, however, a high priority for our small team. So, give it a try with your workload and decide for yourself.

That’s the official explanation, which basically means it’s experimental, but it’s relatively stable. The team doesn’t have a high priority on updating this plug-in, so if we run into problems, we may have to fix them ourselves.

If you have the ability to change this plug-in, which is written by Erlang after all, then you can choose this one.

The advantage is also obvious, out of the box, processing logic is relatively simple.

Queue TTL limits

If we need to deal with a large number of time types of delayed data, then we need to create many queues. Of course, the advantages of this scheme are transparent, stable and easy to troubleshoot when encountering problems.

reference

【 Finding bottlenecks with the RabbitMQ 】 3.3 blog.rabbitmq.com/posts/2014/… [Do you really understand delay queues] juejin.cn/post/684490… The RabbitMQ practical guide book.douban.com/subject/275 】… The rabbitmq artificial intelligence based on the rabbitmq 】 【 www.dazhuanlan.com/ajin121212/… 【 the rabbitmq – of – the message – exchange 】 blog.51cto.com/kangfs/4115… 【 the Scheduling Messages with the RabbitMQ 】 blog.rabbitmq.com/posts/2015/… 【Centos7 install RabbitMQ 3.8.5, the easiest installation step ever 】blog.csdn.net/weixin_4058… 【 the RabbitMQ prefetch_count, dead-letter queue and queue delay using 】 boilingfrog. Making. IO / 2022/01/07 /…