Once you’ve seen the mechanics of RocketMQ message storage, this article will cover the process of message consumption and related concepts. Public number reply: long polling, get the test code

News consumption

The concepts of consumer groups for message consumption are basically similar to those in Kafka, such as:

A consumer group can contain multiple consumers, and a consumer group can subscribe to multiple topics. There are two types of consumer groups: cluster mode and broadcast mode.

In cluster mode, the same message under the topic is only allowed to be consumed by one consumer in the consumption group, and the consumption progress is stored on the broker. In broadcast mode, each consumer can consume the message, and the consumption progress is stored on the consumer side.

In cluster mode, a consumption queue can be consumed by only one consumer at a time, and one consumer can consume multiple message queues. See my previous article for details.

There are two ways to transfer messages between the RocketMQ message server and the consumer: push mode and pull mode. Pull mode, that is, consumers actively send requests to the message server; Push mode, where the messaging server pushes messages to consumers. The push mode is based on the pull mode.

Consumer startup

The main thing is to initialize the three components, and then start the background scheduled task.

Three components:

  • [RebalanceImpl] a balanced MessageQueue service that is responsible for allocating message queues (MessageQueue) available for consumption by current consumers. When a new Consumer is added or removed, the message queue is reassigned.

  • [PullAPIWrapper] Pull the message component

  • [offsetStore] Consumption progress component

Several scheduled Tasks

  • PullMessageService

    Pull request to get a Consumer from the blocking queue pullRequestQueue

  • RebalanceService

    Load balancing scheduled tasks to assign consumable MessageQueue to consumers

  • fetchNameServerAddr

    Obtain the NameSever address periodically

  • updateTopicRouteInfoFromNameServer

    Update Topic routing information regularly

  • cleanOfflineBroker

    Periodically clear offline brokers

  • sendHeartbeatToAllBrokerWithLock

    Send a heartbeat

  • persistAllConsumerOffset

    Persistent consumption schedule ConsumerOffset

Message pull

For any messaging middleware, there are two ways for a consumer client to get messages from the messaging middleware and consume them:

  • Pull

    That is, consumers actively pull messages from the Broker at regular intervals

  • advantages

    Consumption speed, quantity controllable

  • disadvantages

    If the interval is short, it may be empty, and frequent RPC requests increase network overhead. If the interval is long, there may be message delays consuming schedule offsets that the consumer needs to maintain by himself

  • Push

    The Broker actively pushes messages to consumers in real time

  • advantages

    Messages live, keep long links, do not frequently establish links

  • disadvantages

    If the number of messages is too large and the consumer throughput is small, the consumer buffer overflow may occur.

At the beginning of this article we also said that the RocketMQ push pattern is based on the pull pattern.

PullMessageService PullMessageService

RocketMQ pulls the message using the PullMessageService.

PullMessageService#run

About PullRequest

PullMessageService There are two ways to add the PullRequest:
  • Delay to add

  • Immediately add

【 About ProcessQueue】

ProcessQueue is a replay, snapshot, of the MessageQueue on the consumer side. PullMessageService pulls 32 messages from the message server by default and stores them in ProcessQueue according to the queue offset order. The PullMessageService then submits the messages to the consumer thread pool. After successful consumption, the message is removed from the ProcessQueue.

[Flow control for message pull]

  • The number of messages in processQueue is greater than 1000, and the message size in processQueue is greater than 100 MB, delaying the pull of messages by 50 milliseconds

  • The message with the largest offset and the message with the smallest offset in the processQueue is pulled with a delay of 50 milliseconds over a span of 2000.

  • Pull the subscribed message according to the topic, if empty, delay 3 seconds, then pull again.

[Message server Broker assembles messages]

Pullmessageprocess #processRequest

  1. Build message filters based on subscription messages

  2. Call messagestore.getMessage to find the message

  3. Gets the message consumption queue based on the subject name and queue number

  4. Message offset Exception Check the next pull offset

  5. Fill in the nextBeginOffset, minOffset, and maxOffset of the responseHeader according to the PullRequest

  6. BrokerId for the next pull task is set based on the master-slave synchronization delay if the slave node data contains the offset for the next pull

  7. Update message consumption progress if the commitLog flag is available and the current node is the active node

[Message pull long polling mechanism]

The RocketMQ push mode circulates a message pull request to the message server.

When a consumer pulls a message from the broker, if the message does not arrive on the consumption queue and the long polling mechanism is not enabled, the server waits for shortPollingTimeMills (1 second by default) to determine whether the message has arrived on the queue. If the message does not arrive, Then a message is displayed to pull PULL_NOT_FOUND.

If long polling mode is enabled, rocketMQ will poll every 5s to check whether the messages are reachable. RocketMQ will notify the suspended thread as soon as a new message arrives to verify again whether the new message is of interest to the thread. If so, it will extract the message from the commitlog file and return it to the message pull client. The timeout is encapsulated in the request parameter by the message pullers when the message is pulled. The default for PUSH mode is 15s.

PULL mode through DefaultMQPullConsumer# setBrokerSuspendMaxTimeMillis Settings. RocketMQ enables long polling by configuring longPollingEnable to true on the Broker.

RocketMQ’s long polling mechanism is done by two threads. PullRequestHoldService, ReputMessageService.

【Push consumption Pattern flow analysis 】

  • The background independent thread RebalanceServic performs load balancing according to the number of message queues in the Topic and the number of consumers in the current consumption group, allocates the corresponding MessageQueue to the current consumers, and encapsulates them as PullRequest instances and puts them into the pullRequestQueue queue.

  • The Consumer side starts the PullMessageService, which is an independent thread in the background, to fetch pullRequests from the pullRequestQueue and send the RPC requests of Pull messages to the Broker asynchronously through the network communication module. This is a typical producer-consumer model, which implements quasi-real-time automatic message pull.

  • PullMessageService after the message is asynchronously pulled, the PullCallback is used for callback processing. If the PullCallback is successful, the consumption progress is updated, and the pullPullRequest is put into the blocking queue pullRequestQueue, and then the PullCallback is immediately pulled

  • Listener ConsumeMessageConcurrentlyService will always listen callback methods PullCallback, pull to the message to Consumerrequest for processing, Consumerrequest will call the consumeMessage() interface implemented by the consumer business side to process the specific business. The Consumer business side will return an ACK to the Consumerrequest after processing. If the ACK fails, The message is sent back to the Broker in cluster mode for retry (broadcast model retries are too costly) and finally updated offsetTable with the consumption schedule

  • On the Broker side, the PullMessageProcessor service processor obtains the Pull message from the commitLog through the MessageStore instance after receiving the RPC request for the Pull message. If the first attempt to Pull a message fails (e.g. there are no messages available for consumption at the Broker side), the long polling mechanism holds and suspends the request. The PullRequestHoldService on the Broker side is reattempted and the background thread ReputMessageService is used for secondary processing.

[Push message flow chart]

Long polling mechanism for RocketMQ message consumption

The difference between regular polling and long polling is as follows:

  • Ordinary polling is relatively simple. It is to initiate a request periodically, and the server will return immediately after receiving the request regardless of whether the data has been updated

    The advantage is simple implementation and easy to understand. The disadvantage is that the server is passive, and the server must constantly process the client connection, and the server cannot control the frequency of client pull and the number of clients.

  • Long polling is an optimization of ordinary polling. The client initiates a request, and the server does not respond immediately after receiving the request. Instead, the server holds the connection with the client and waits for the data to change (or no change occurs after the specified time) before responding to the client

    In plain English, it is to add a control to the ordinary polling, your client can request me at any time, but I will decide whether to reply, this ensures that the server will not be paced by the client, resulting in their own pressure uncontrollable.

In RocketMq, the consumer initiates a pull request. When the broker processes the pull request, if no message is found, it will not return any information to the consumer. Instead, it will hold and suspend the request so that the next pull request will not be initiated immediately. The pullRequest information is added to the pullRequestTable, waiting for the event to be triggered to notify the consumer.

When the producer sends the latest message, it first persists to the commitLog file and then asynchronously persists both the consumerQueue and index. It then activates the request that the consumer sends to hold, immediately writing the message over the channel to the Consumer customer. If no message arrives and the offset pulled by the client is up to date, the request is held. The timeout duration of the hold request is less than the timeout duration of the request. At the same time, the Broker periodically detects whether the request times out. If the request times out, the Broker returns the request with status code NO_NEW_MESSAGE.

Then on the Broker side, the PullRequestHoldService is traversed through all pending pullRequestTables by a background independent thread and returns a response to the consumer if there is a message. ReputMessageService thread builds ConsumeQueue/IndexFile continuously to check for new messages. If so, Get the pullRequest from the pullRequestTable through the key of the Topic+queueId, and then communicate with the pullRequest according to the long link channel.

This long polling mechanism solves the problem that the Consumer side has to constantly send invalid polling Pull requests, resulting in a high load on the Broker side throughout the RocketMQ cluster.

The process is as follows:

Message queue load and redistribution mechanisms

When multiple machines are deployed to a business system, each machine starts a Consumer, and the consumers are all in the same Consumer group. When multiple consumers in a Consumer group consume a Topic, A Topic can have multiple MessageQueue. For example, if there are two consumers and three MessageQueue, how do you allocate the three MessageQueue? This comes down to load balancing for the consumers. First, when the Consumer is started, it registers itself with all the brokers and keeps the heartbeat so that each Broker knows which consumers are in the Consumer group. The Consumer then links to a random Broker when consuming, fetching all the consumers in the Consumer group. The main process is as follows:

The RocketMQ message queue redistribution is implemented by the RebalanceService thread. The RebalanceService starts with the startup of the MQClientInstance. RebalanceService executes mqClientInstancedorebalance every 20 seconds by default

[Message queue load flow for topic]

  1. Get the queue for the topic and send a request to the broker to get the ID of all consumer clients in the consumer group under the topic.

  2. Rebalance is necessary only if neither is empty.

  3. With the rebalance, you need to order the queue, as well as the consumer client ids, to make sure the view is consistent across the same consumer group.

  4. According to the allocation policy AllocateMessageQueueStrategy distribution queue for consumers.

The client interacts with the RebalanceService thread during execution along with the PullMessageService

Message consumption process

[Consumption process]

  1. By default, 32 messages are pulled. If the number of messages is greater than 32, paging is processed.

  2. Each time a consumption is made, it determines whether the processQueue is deleted, preventing consumers from consuming queues that are not their own

  3. The subject name of the recovery retry message, the rocketMQ message retry mechanism, determines that if the delayTimeLevel of the message is found to be greater than 0, the retry topic will be first stored in the properties of the message. Then set the topic name to SCHEDULE_TOPIC to re-participate in message consumption when the time is up.

  4. Before consuming, perform hock

  5. Execute the consumption code that we wrote

  6. After consumption, hock is performed

  7. When the consumption is complete, it verifies again whether processQueue has been deleted, and if so, the result is not processed.

  8. The result returned by the consumer is processed

  9. If the purchase is successful, then ack = Consumerequest.getmsgs ().size() -1. The consumption schedule will be updated directly. If the consumption fails, then ACK = -1 and the message is resended. If the message fails again when it is resended, there is a 5 second delay before the consumption continues.

  10. Whether the consumption succeeds or fails, the consumption progress will be updated

[News confirmation]

When the client retry sending messages, encapsulates the ConsumerSendMsgBackRequestHeader.

Server receiving logic

  1. The subscription configuration of the consumer group is obtained. If the configuration does not exist, the configuration is returned

  2. Create the topic: %RETRY% + Group and select a queue at random

  3. Create a new message using the original message

  4. If the maximum number of retries to retry the message exceeds 16 (the default), the message is placed in the %DLQ% queue (dead letter queue). Waiting for manual processing

  5. Commitlog.putmessage stores the message.

summary

From the basic concepts of message consumers and consumer groups, to the process of message consumption. We learned about the mechanics of Message consumption in RocetMQ. After the consumer client is started, several scheduled tasks are run on the backend to process the associated logic. It is also known that there are two modes for RocetMQ message retrieval: push and pull, and the push mode is based on the pull mode. You know the difference between regular polling and long polling, and understand the implementation logic of long polling. You have an understanding of the message consumption and validation process.