1. An overview of the

1.1 Why Flow control?

Flow control is designed to prevent producers from producing messages faster than the Broker can process them. You need to temporarily limit the producer’s production speed so that the Broker’s processing can keep up with the production speed.

Erlang processes do not share memory, each process has its own process mailbox, and only communicate with each other through messages. Erlang does not limit the size of a process’s mailbox, so when a large number of messages continue to be sent to a process, the process’s mailbox becomes too large and eventually runs out of memory and crashes. Without flow control, the size of the internal process mailbox can quickly reach the memory threshold.

1.2 RabbitMQ flow control mechanisms

1.2.1 Global Flow Control (High Memory water and Low Disk Water)

RabbitMQ can set thresholds for memory and disk usage, at which point producers will be completely blocked until they recover.

Flow control of memory and disk is equivalent to global flow control. In flow control, sending messages are completely blocked and usually blocked for a long time (more than a few minutes) before recovery.

In global flow control, the Connection is blocked from the Web UI.

In Rabbitmq-java-client, you can add blockedListener to a Connection to listen for blocking and unblocking events to deal with possible blocking on the client side.

connection.addBlockedListener(
    reason -> {
        try {
            unblock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
},
    () -> latch.countDown()
);
Copy the code

1.2.2 In-process flow control

In-process flow control is flow control for Erlang processes. It is not the same as global flow control. Also known as per-connection Flow Control.

The RabbitMQ Broker uses multiple processes to process messages in the following order.

A simplified depiction of message flows

reader -> channel -> queue process -> message store
Copy the code

In-process flow control refers to the flow control between these four processes.

In-process flow control does not affect the consumer side.

When a process is in flow control state, the status of the process is yellow Flow on the Web UI, and the process temporarily blocks message production.

A queue in flow state

The blocking time for in-process flow control is usually very short, less than 1 second. But there are also ones that can last several minutes.

In-process flow control blocks the socket receiving method on the Broker side so that the client cannot listen and process it.

Starting with RabbitMQ version 3.5.5, a letter of credit based flow control implementation is introduced.

This paper mainly discusses the realization of in-process flow control based on letter of credit.

1.2.3 Confirmation by sender

This is not really a flow control mechanism, but producer validation allows messages to be sent without loss and to control the speed at which messages are sent.

When sender confirmation is not enabled, messages may be sent before reaching the server.

Sender acknowledgement After this function is enabled, a message is sent to the matched queue and an acknowledgement request is sent to the sender. Then the action of sending a message is complete.

1.2.4 Customer prefetch

Through the Channel#basicQos(int prefetchCount) method to set the maximum number of unack messages allowed by consumers, a batch of messages can be prefetched to consumers for consumption.

2. Outline the process

Starting with RabbitMQ version 3.5.5, a letter of credit based flow control implementation is introduced.

2.1 Lc Configuration

The two parameters of lc flow control can be found by querying environment variables

rabbitmqctl eval 'application:get_all_env(rabbit).'
#...
{credit_flow_default_credit,{400,200}}  # {InitialCredit, MoreCreditAfter}
#...
Copy the code

Where 400 represents the initial credit value of each process, 200 indicates that the downstream process will add 200 credit value to the upstream process after processing 200 messages.

These two parameters are {200, 50} in older versions.

2.2 Flow control based on letter of credit

Unlike operating system processes, Erlang processes are lightweight processes.

To put it simply, there are four processes in RabbitMQ.

reader -> channel -> queue process -> message store
400       400        400
Copy the code

At initialization, credit values are assigned to the first three processes, InitialCredit, default 400.

When a process processes a message and sends it to a downstream process, its own credit value is reduced by one.

When a downstream process finishes processing a message, it sends an Ack message to the upstream process. However, instead of increasing the upstream process’s credit by one directly, the upstream process’s credit is increased by 200 after the MoreCreditAfter message is processed (the default is 200).

When the process’s credit is about to be 1, upstream processes are blocked from sending messages to it.

3. Detailed process

3.1 Detailed flow control process

Each orange component below is an Erlang process.

Each RabbitMQ broker is internally implemented using an actor pattern, with different components communicating via messaging (and sometimes local).

A simplified depiction of message flows

Let’s simplify this model and then analyze the flow control mechanism based on letter of credit.

  • Rabbit_reader: Connection processing process. Responsible for receiving and parsing AMQP protocol packets and sending messages to Channel
  • Rabbit_channel: Channel processing process, responsible for AMQP protocol methods and route parsing. Carry out security and coordination processing, etc
  • Rabbit_amqqueue_process: Queue handler that stores messages to memory and persists Queue indexes
  • Rabbit_msg_store: Store handler, responsible for message persistence

Credit Based Flow Control with Classic Queues,

  1. When the letter of credit is initialized, the downstream process allocates initial credit values to the first three processes, reader, Channel and Queue respectivelyInitialCredit(400)(1 in the picture)
  2. When a Reader process starts processing a message, it sends its credit value -1 to the channel process (figure 2).
  3. When receiving a message from a reader, the channel process performs an ACK operation on the letter of credit system. The channel process keeps track of how many messages it ack from the Reader process. When the cumulative number of received and ack messages reachesMoreCreditAfter(200)After, a new reader will be assignedMoreCreditAfter(200)Credit value. (3 in the picture)
  4. When the credit value in the process dictionary falls to 0, the process is blocked. It will neither receive nor send messages until a new credit value is obtained.
  5. Finally, the TCP reading process is blocked and reading from the socket is stopped.

3.2 How to Identify performance Bottlenecks

In the admin UI, you may see that connections, channels, and Queues are in flow state, indicating that they have recently been in flow control. This means that they temporarily run out of credit and wait for downstream processes to grant more credit. In-process flow control may trigger multiple times within a second.

How to identify process performance bottlenecks by flow state?

Simply put, the flow state of a process causes its upstream processes to enter the flow state. The process enters the flow state because its downstream process becomes a performance bottleneck.

For example, in the following figure, the Queue process becomes a performance bottleneck:

Credit exhaustion.

In the figure above, Queue processing is slow, which means that the Queue may not grant a Channel a new credit for a long time. A Channel is processed faster than a Queue, so the Channel’s credit value is exhausted first.

When a Channel runs out of credit, it blocks, neither accepts nor processes messages, and Reader runs out of credit.

That is, if a Queue is a performance bottleneck, its upstream channels and readers will eventually be in flow.


The following conclusions can be summarized to determine where the performance bottleneck is:

  • When a Connection is inflowState, but no Channel is in the ConnectionflowState, which means that one or more channels in the Connection have a performance bottleneck. Some Channel processes, such as processing routing logic, cause this by overloading the server CPU. Especially in sendingLots of small nonpersistent messages, this situation is most easily manifested.
  • When a Connection is inflowState, and there are several channels in the ConnectionflowStatus, but none of the corresponding queues are inflowState, which means that one or more queues have a performance bottleneck. This can be caused by the server CPU being overloaded while messages are queued, or the server I/O being overloaded while messages are queued to disk. Especially in sendingLots of smaller persistent messages, this situation is most easily manifested.
  • When a Connection is inflowState, and several channels in the ConnectionflowState, well and there are several corresponding queues inflowState, which means there is a performance bottleneck in message persistence. This condition is caused by high server I/O load while storing messages from queues to disk. Especially in theSend a large number of persistent messages, this situation is most easily manifested.

4. Source code analysis

In Erlang, each process is saved as an.erl file. Unlike the operating system process, the process here is a lightweight process managed by the Erlang system. The logic of lc flow control is located in credit_flow.erl file.

Let’s use rabbit_reader (Connection) and Rabbit_channel processes as examples to see how the source code handles the flow of credit and the blocking of messages.

4.1 Processing messages to reduce credit

When rabbit_reader processes a command with content (such as basic.publish), it executes the following bold logic

% rabbit_reader.erl
process_frame(Frame, Channel, State) ->
    ChKey = {channel, Channel},
    case (case get(ChKey) of
              undefined -> create_channel(Channel, State);
              Other     -> {ok, Other, State}
          end) of
        {error, Error} ->
            handle_exception(State, Channel, Error);
        {ok, {ChPid, AState}, State1} ->
            case rabbit_command_assembler:process(Frame, AState) of{ok, NewAState} -> put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, State1); {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, State1); **{ok, Method, Content, NewAState} -> rabbit_channel:do_flow(ChPid, Method, Content), put(ChKey, {ChPid, NewAState}), post_process_frame(Frame, ChPid, control_throttle(State1)); ** {error, Reason} -> handle_exception(State1, Channel, Reason)end
    end.
Copy the code

Rabbit_channel :doflow/3

% rabbit_channel_common.erl
do_flow(Pid, Method, Content) ->
    %% Here we are tracking messages sent by the rabbit_reader
    %% process. We are accessing the rabbit_reader process dictionary.
    credit_flow:send(Pid),
    gen_server2:cast(Pid, {method, Method, Content, flow}).
Copy the code

You can see that the credit_flow:send/1 method is called in rabbit_channel. Pid is the process number of Channel.

The logic here is that Rabbit_reader tracks the number of messages it has sent to Rabbit_channel through the credit_flow module, reducing its credit value by one for each message it sends. The tracked information is stored in the rabbit_reader process dictionary.

Note that although credit_flow:send/1 is called in the rabbit_channel module, it is still in the rabbit_reader process. The rabbit_channel process will be entered only after gen_server2:cast/2 is executed. Thus, when credit_flow:send/1 is called, the credit minus one is still tracked in rabbit_reader.

See the credit_flow:send/2 and credit_flow:UPDATE definitions below to get and UPDATE the values of the process dictionary by calling the GET /1 and PUT /2 methods.

% credit_flow.erl
send(From, {InitialCredit, _MoreCreditAfter}) ->? UPDATE({credit_from, From}, InitialCredit, C,if C == 1 -> block(From),
                         0;
               true   -> C - 1
            end).
Copy the code
% credit_flow.erl
%% process dict update macro - eliminates the performance-hurting
%% closure creation a HOF would introduce
-define(UPDATE(Key, Default, Var, Expr),
        begin
            %% We deliberately allow Var to escape from the case here
            %% to be used in Expr. Any temporary var we introduced
            %% would also escape, and might conflict.
            Var = case get(Key) of
                undefined -> Default;
                V         -> V
            end,
            put(Key, Expr)
        end).
Copy the code

Take a look at the process dictionary for information about credits

The key used to hold the credit information is {credit_from, From}, where represents the process number of the message receiver (in this case, rabbit_channel). When this key reaches 0, the process with the process dictionary is blocked (calling credit_flow:block/1)

4.2 The process is blocked and stops receiving information

The credit_flow:block/1 method is called when the credit value in the process dictionary reaches 0. Let’s look at what this method does.

% credit_flow.erl
block(From) ->? TRACE_BLOCKED(self(), From),case blocked() of
        false -> put(credit_blocked_at, erlang:monotonic_time());
        true  -> ok
    end,
    ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
Copy the code

This updates the value of credit_blocked in the process dictionary, adding the downstream process ID (in this case, rabbit_channel) that blocked the process to credit_blocked.

Note that because rabbit_reader can send messages to more than one process, it can also be blocked by more than one process. Therefore, the value of credit_blocked is a list of process ids.

credit_blocked -> [pid()]
Copy the code

So how do I stop receiving messages after a process is blocked? Let’s examine the recvloop method that rabbit_reader uses to receive messages.

% rabbit_reader.erl
recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
    mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
    throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
  when BufLen < RecvLen ->
    case rabbit_net:setopts(Sock, [{active, once}]) of
        ok              -> mainloop(Deb, Buf, BufLen,
                                    State#v1{pending_recv = true});
        {error, Reason} -> stop(Reason, State)
    end;
Copy the code

Mainloop will call recvloop to create an infinite loop.

Rabbit_reader sets the socket property to {active, once} for each packet it receives. If the connection is currently blocked, {active, once} is not set, and the receive process blocks on the receive method.

4.3 Increase credit value

Rabbit_channel acknowledges rabbit_reader once for each message it processes (credit_flow: ACK).

When the total number of messages processed by Rabbit_channel reaches the MoreCreditAfter value, a new MoreCreditAfter credit is granted to Rabbit_Reader.


Let’s first look at the implementation of the ACK function

% credit_flow.erl
ack(To, {_InitialCredit, MoreCreditAfter}) ->? UPDATE({credit_to, To}, MoreCreditAfter, C,if C == 1 -> grant(To, MoreCreditAfter),
                         MoreCreditAfter;
               true   -> C - 1
            end).
Copy the code

The rabbit_channel process keeps track of how many messages it ack to a particular sender (rabbit_reader). The key used To hold the number of ACK messages in the process dictionary is {credit_to, To}, where To is the process number of the sender (rabbit_reader).

When the MoreCreditAfter message is ack, the grant method is called to grant rabbit_reader more credits.


% credit_flow.erl
grant(To, Quantity) ->
    Msg = {bump_credit, {self(), Quantity}},
    case blocked() of
        false -> To ! Msg;
        true- >? UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])end.
Copy the code

Here rabbit_channel will send a {bump_credit, {self(), Quantity}} message to Rabbit_reader to grant credit. Where self() points to rabbit_channel.


When rabbit_reader receives the bump_credit message, it needs to pass it in and call credit_flow:handle_bump_msg/1 to process the new credit.

% credit_flow.erl
handle_bump_msg({From, MoreCredit}) ->? UPDATE({credit_from, From},0, C,
            if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
                                                    C + MoreCredit;
               true                              -> C + MoreCredit
            end).
Copy the code

We access rabbit_reader’s process dictionary and update {credit_from, From} key. If the credit is greater than zero, the process is unblocked.

4.4 Unblocking processes

% credit_flow.erl
unblock(From) ->? TRACE_UNBLOCKED(self(), From), ? UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),case blocked() of
        false -> case erase(credit_deferred) of
                     undefined -> ok;
                     Credits   -> _ = [To ! Msg || {To, Msg} <- Credits],
                                  ok
                 end;
        true  -> ok
    end.
Copy the code

Calling credit_flow:unblock/1 updates the credit_blocked list to empty it. The process can then continue sending messages.

Meanwhile, credit_flow:unblock/1 will be responsible for sending all messages saved in the Credit_DEFERRED list.


When unblock/1 is called, the rabbit_channel process ID is removed from the credit_blocked list.

%% We are operating on process A dictionary.
get(credit_blocked) => [B, C].
unblock(B).
get(credit_blocked) => [C].
Copy the code

In this case, A is still blocked until C grants it more credit. When A is unblocked, it processes its credit_DEFERRED list, sending A bump_credit message to the process in the list.

5. Reference materials

  • Flow Control
  • Finding time functions with RabbitMQ 3.3
  • New Credit Flow Settings on RabbitMQ 3.5.5
  • RABBITMQ INTERNALS – CREDIT FLOW FOR ERLANG PROCESSES
  • Quorum Queues and Flow Control – The Concepts
  • RabbitMQ Field Guide
  • RabbitMQ traffic control mechanism analysis