Rabbitmq can be roughly divided into two parts (Exchange and MQ). All messages sent to Rabbitmq are sent to Exchange first, which functions like a router. It determines which MQ to place a message in based on its type (FANout, Direct, Topic) and binding information, and its ability to temporarily store messages and poll messages in MQ to the recipient.

backing queue

The backing queue implements MQ logic as specified in the AMQP protocol. The backing queue stores durable=true messages as much as possible. When memory is low, some messages are put into DISK in exchange for more memory. The Backing queu is divided into 5 small Q’s.

The “general” process of message delivery in these Q’s Q1 -> Q2 ->delta-> Q3 -> Q4 implements the classification management of messages in the RAM ->DISK->RAM process. Most of the time, a message doesn’t need to go through every little Q and can usually skip most of it. Corresponding to these 5 Q’s, the lifecycle of the backing queue can be divided into four states:

  • Alpha: The location information of the message and the message itself are in RAM, and such messages are arranged in Q1 and Q4.
  • Beta: The location of the message is stored in RAM, the message itself is stored in DISK, and such messages are arranged in Q2 or Q3.
  • Gamma: The location of the message is stored in RAM and DISK, the message itself is stored in DISK, and such messages are arranged in Q2 or Q3.
  • Delta: The location of the message and the message itself are stored in DISK, and such messages are arranged in the Delta. fromQ1->Q2->deltaThis process is the gradual movement of messages from RAM to DISK, whiledelta->Q3->Q4The gradual move from DISK to RAM.

Typically, under normal load, a message will not go through each state, and if messages are consumed at a rate no faster than new messages are received, it is likely that only Alpha states will occur for messages that do not need to be guaranteed reliability. Durable = True messages must enter Gamma, and publish Confirm is enabled. If the message is consumed fast enough and the memory is sufficient, the message will not continue to the next state.

The backing queue will consume the message as early as possible in Q1 or Q2 without going through all five queues, thus reducing the overhead of the backing queue. If you go “too deep”, there will be memory switching in and out of the system overhead. This presents a problem: usually when system load is high, received messages that are not consumed quickly will go into a deep queue, increasing the average cost of processing each message. Because it takes more time and resources to deal with the backlog of messages, so used to process will reduce the ability of the new message, makes the news then was backlog into deep queue, we will continue to increase the average cost of each message, so it will be more and more deteriorated, processing capacity of the system is reduced greatly.

According to the official blog, there are three ways to deal with the problem:

    1. Perform flow control.
    1. increaseprefetchTo send multiple messages to the receiver at once, speeding up the rate at which messages are consumed.
    1. usingmutli ackTo reduce the overhead of ack processing. RabbitMQ now has a good flow control mechanism and the number of messages piled up in MQ is always small (less than 5). All the user needs to do is 2,3.

mirror queue

The mirror queue is basically a special backing queue that has a normal backing queue wrapped around it for local message persistence, with the added capability to copy messages and acks to all the mirrors. All rabbit_mirror_queue_master operations are synchronized to slave nodes in Guarenteed Multicast (GM) mode.

Adding a new node

New slave nodes are allowed to be added to the cluster. The newly added slave nodes do not synchronize all the messages of the master node that existed before the slave node was added to the cluster, but only synchronize the new messages. As the old messages are consumed, the slave nodes are fully synchronized with the master node after a period of time.

Node failure

When the master node fails, the oldest message queue of all slaves becomes the new master, because such a node is most likely to be fully synchronized with the original master node.

Node restart

When a node is restarted after either the master or slave node fails, all messages recorded in the disk are discarded and a new slave node is added to the cluster.

GM

GM module is a reliable multicast communication protocol, which can ensure the atomicity of multicast messages, that is, ensure that the surviving nodes in the group either receive messages or do not receive them. Its implementation is roughly as follows: All members of A group form A ring, such as A->B->C->D->A. If A is the master node and wants to send A multicast message, A first sends the message to B, A’s successor node. After B receives the message, IT sends it to C, then D, and finally D sends it to A. In this process, if a node fails, the sending node will send messages to the successor node of the failed node. If the successor node also fails, the sending node will send messages to the successor node of the failed node. When A receives the returned message, A can confirm that all “living” nodes have received the message, but B, C, and D cannot confirm that all nodes have received the message, so they cannot submit the message. At this point, A sends an Ack to B. After B receives the Ack, it can confirm that all nodes have received the message. B sends the Ack to C and D finally sends it back to A. At this point, the whole sending process is completed. If USER A does not receive an ACK, the sending fails.

The Rabbitmq process

  • tcp_acceptorThe process receives the client connection and creates itRabbit_reader, rabbit_writer, rabbit_channelProcess;
  • rabbit_readerResponsible for receiving client connections and parsing AMQP frames;
  • rabbit_writerResponsible for returning data to the client;
  • rabbit_channelResponsible for parsing AMQP methods, routing messages, and sending them to the appropriate queue processes;
  • rabbit_amqqueue_processQueue processes are created when RabbitMQ is started (restoring durable queues) or queues are created.
  • rabbit_msg_storeIs the process responsible for message persistence;

Whenever we issue an AMQP message to RabbitMQ, we have the following Erlang message flow:

reader process -> channel process -> amqqueue process -> message store
Copy the code

The Rabbitmq flow control

Messages can be stored in different queues in RabbitMQ, and the earlier a message is consumed, the fewer queue levels it passes through and the lower the average cost of processing a message. But if the rate at which receives the message too fast, MQ too late processing, these messages may enter the deep level of queue, greatly increase the average message processing overhead, further makes the ability to deal with the new message and send the old, more information into the queue, deep cycle, the performance of the whole system could be greatly reduced. In addition, if the rate of receiving messages is too fast, the mailbox of some processes will be too large, which may lead to serious consequences. RabbitMQ has designed a flow control mechanism for this purpose. This article explains how it works from three aspects.

Erlang processes do not share memory with each other (except for binary types), but communicate via messaging, and each process has its own process mailbox. Erlang by default does not set a limit on the size of a process’s mailbox, so when a large number of messages continue to be sent to a process, the process’s mailbox becomes too large and eventually runs out of memory and crashes.

In RabbitMQ, if the producer continues to send at a high rate and the consumer consumes at a low rate, this will cause the internal process mailbox to grow rapidly without flow control, reaching RabbitMQ’s overall memory threshold and blocking the producer (thanks to this blocking mechanism, RabbitMQ itself will not crash), and RabbitMQ will persist data in memory to disk using page operations.

To solve this problem RabbitMQ uses a Credit based flow control mechanism. That is, each message processing process has a credit group {InitialCredit, MoreCreditAfter}, which defaults to {200, 50}.

When sender process A sends A message to receiver process B:

  • For sender A, each time A message is sent, the number of credits decreases by 1 until it reaches 0 and is blocked.
  • For recipient B, each MoreCreditAfter message is received, A message will be sent to A, giving A MoreCreditAfter Credit.
  • When A’s Credit is greater than 0, A can continue to send messages to B.

It can be seen that: based on lc flow control, the sending speed of message sending process will be limited within the processing speed of message processing process;

Reader process < -- [grant] -- channel process < -- [Grant] -- amqQueue process < -- [Grant] -- Message StoreCopy the code

1. How to open and close the gate

RabbitMQ uses a TCP long connection to communicate and receives data from the rabbit_reader process. Rabbit_reader sets the socket property to {active, onece} for each packet it receives. If {active,once} is not set when the current connection is blocked, the receive process blocks on the receive method. In this way, the gate is closed and closed.

2. When to close the gate

RabbitMQ is developed using Erlang /OTP. A message must be forwarded between multiple processes from the time it is received to the time it is consumed. RabbitMQ determines when to close the gate by monitoring the number of unreceived messages in the Mailbox of each node in the chain. The implementation mechanism is described as follows:

{{credit_FROM,B}, value} {{credit_from,C}, value} {{credit_FROM, PID}, value} {{credit_to, PID}, value} x {{credit_to,A}, Value} {value} {credit_to, B}, A = = = = = = = = = = = = = = = = = = = = = = = = = = > B = = = = = = = = = = = = = = = = = = = = = = = = = "C = = = = = = = = = = = = = = = = = = = = = = = ="Copy the code

Process A, B, and C connect to A message chain, and each process has A pair of credit values in its dictionary for sending and receiving messages. Take Process B for example, {{credit_FROM, C}, Value}, which means how many messages can be sent to PROCESS C. The Value decreases by 1 for each message sent. The process is blocked and no longer sends messages to downstream processes or receives messages from upstream processes. {{credit_to, A}, Value} specifies how many more messages are received before the upstream process is issued with the credit Value {bump_credit, {self(), Quantity}}. Increment {credit_from, PID} so that the upstream process can continue to send messages. But when the upstream sending rate is higher than the downstream receiving rate, the process will run out of credit and be blocked all the way to the upstream Rabbit_reader, where the Rabbit_reader will close the gate.

3. When to open the gate

When the upstream process receives a bump_credit message from the downstream process, if it is in the block state, it unblocks and starts receiving messages from further upstream processes, one by one, eventually unblocking Rabbit_reader.

The state machine

The process that handles queue sending and receiving logic in Rabbitmq is a finite state machine process.

    1. When MQ has both producers and consumers, the state machine goes as follows: Receive messages -> Persist -> Send messages -> Receive messages ->… – >. Under the control of the flow control mechanism, the sending and receiving rates remain basically the same, and the number of messages accumulated in the queue is very low.
    1. When there are no messagers, MQ will continue to receive messages and persist them until the disk is full, since there is no send logic and higher production rates can be achieved.
    1. When there is a backlog of messages in MQ, MQ continues to pull the backlog from the queue and send it out until there are no backlog messages, the consumer’s qos is used up, there are no consumers, or the consumer’s channel is blocked. If one of the four conditions is not met, MQ will continue to send stacked messages without processing incoming messages, and the sender will be blocked by flow control.

Summary: As you can see from the above description, the sending rate slows down as messages pile up due to the flow of MQ. This process is designed for two reasons:

    1. The accumulated messages can be consumed faster and the delay of messages can be reduced.
    1. The fewer messages piled up in MQ, the lower the average cost of each message processing, improves overall performance, so the piled messages need to be sent as soon as possible.

Paging

Paging is triggered when memory is tight. Paging converts a large number of alpha messages into beta and gamma; If memory is still tight, continue to transition from beta and Gamma states to Delta states. Paging is a continuous process involving multiple state transitions of a large number of messages. Therefore, the overhead of Paging is high and the system performance is seriously affected.

Acknowledgement mechanism for messages

When Confirm is enabled, producers and RabbitMQ acknowledge messages by sending an acknowledgement number, which is per Channel. To acknowledge a message, RabbitMQ simply sends the serial number of the message back to the producer. However, RabbitMQ does not reply to an ACK immediately after receiving the message. The timing of the ack varies with different configurations. The process of confirmation accompanies the entire processing of the message in MQ, so let’s examine the confirmation processing mechanism from the point of view of the entire message life cycle.

1. Rabbit_channel->

Rabbit_channel receives a new message and determines the queue to which the message should be posted according to the routing rules. It generates a globally unique IDENTIFIER msGID for each message. Each message also has a per channel confirmation number, followed by other numbers. To avoid collisions we name the confirmation sequence ch_seq_no. Post the message and the corresponding MSGID, ch_seq_no to the corresponding queue.

2. Rabbit_amqqueue_process->

Rabbit_amqqueue_process receives the message, records it as an unacknowledged message and sends it to backing_queue for persistence.

3. backing_queue->

Backing_queue receives a new message, logs it as an unacknowledged message and posts it to rabbit_MSg_store.

4. rabbit_msg_store->

Rabbit_msg_store receives a new message, records it as an unacknowledged message, and then acknowledges the message periodically or when switching the message store file. Group all the messages that need to be confirmed according to the queue to which they belong, and then call the corresponding queue to send MsgOnDiskFun and MsgIds that need to be confirmed.

5. rabbit_amqqueue_process<-

Rabbit_amqqueue_process executes the MsgOnDiskFun callback function after receiving the confirmation message, and sends the confirmation message to the corresponding Channel process at each reply and norely

6. channel<-

When a channel receives a confirm message, it records the messages that need to be confirmed, merges the confirmed messages, and sends them to the sender

The Rabbitmq HA

Rabbitmq can provide load balancing for Consumers, but RabBIMQ does not provide load balancing. Any node connected to a RabbitMQ cluster can access any message queue in the cluster, but one message queue is stored on only one physical node and the other nodes only store the metadata of the queue. This limits system performance to the network bandwidth of a single node and host performance when there is only one queue. If using multiple queue to improve performance, also will have a new problem, namely how to do load balancing between the queue, network connection at the same time also can affect system performance, such as when a user when sending message to a message queue, and the user is currently connected node is not the queue in real physical nodes, it will inevitably produce the rabbitmq communication between nodes, Thus consuming a portion of the network bandwidth.

So there are several options for clustering RabbitMQ:

Solution 1 Simple load balancing solution

Add a layer of load balancing, such as Haproxy+Keepalive, before sending to RabbitMQ. The sender only needs to send the floating IP address to RabbitMQ, regardless of the node. The ha capability and balancing policy are used to send requests to specific nodes, monitor node health in real time, and automatically pull up abnormal nodes.

Scheme 2 Mirror queue

Rabbitmq now provides a queue mirror function to improve Rabbitmq reliability. When a Rabbitmq node fails, the queue will continue to work without data loss as long as the mirror of the faulty node exists on other nodes. But using the function also have some side effects, it this way on the reliability of redundant data protection will reduce the performance of the system, because to a queue data will have to this queue queue to send all the image data, it will certainly produce a large amount of data between the Rabbitmq node interaction, lower throughput, mirroring the more the more performance will decline. At the same time, to make full use of the resources of the cluster, multiple queues need to be created. If there are mirrors for each queue on all nodes to achieve reliability, the number of queue mirrors will be too high. Too many RabbitMq internal network traffic will eat up a lot of network bandwidth.

Scheme three – party library

Tripartite libraries such as Python’s OSLO_MESSAGING require all rabbitMQ nodes to be passed in, and request distribution is handled by the tripartite library.