Consumer

review

Message consumers fall into two categories: push and pull. The startup mode is similar to that of the producer. Here are the focus:

  • Consumers unfold in the pattern of consumer groups. And mode has cluster mode and broadcast mode two consumption modes. You need to understand how the two clustering patterns are logically encapsulated;

  • Then there is the principle of load balancing on the consumer side. That is, how consumers bind consumption queues;

  • Finally in push mode on consumers, MessageListenerConcurrently and MessageListenerOrderly both message listener what processing logic is different, why the latter can keep the message order

Through the above problems to launch the source code analysis.

Start the

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start

@Override
public void start(a) throws MQClientException {
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultMQPushConsumerImpl.start();
    if (null! = traceDispatcher) {try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e); }}}Copy the code

Consumer startup logic:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

Creating a core object

// ToDo: K2 ->mQClientFactory This is a core object this. MQClientFactory = MQClientManager. GetInstance (). GetOrCreateMQClientInstance (enclosing defaultMQPushConsumer, this.rpcHook);Copy the code

Local file load, register local consumer group cache, consumer start

// Load the local file
this.offsetStore.load();
// Register the local consumer group cache
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }...// Consumer start
mQClientFactory.start();
Copy the code

The startup method is start

org.apache.rocketmq.client.impl.factory.MQClientInstance#start

public void start(a) throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
...
                // Start rebalance service This is the client load balancing service
                // ToDo: k2-> client load balancing
                this.rebalanceService.start(); .default:
                break; }}}Copy the code

Load balancing is implemented on the client

Since there are many client consumption modes, there are also different implementations, and this is the push mode

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    // ToDo: client load: the process of loading tasks on a Topic
    switch (messageModel) {
        // Broadcast mode, there is no load, every consumer needs to consume, just need to update the load information
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); . }// ToDo: k2-> Client load: cluster mode load method
        case CLUSTERING: {
            // Subscribe to the topic
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            // Client ID
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); .if(mqSet ! =null&& cidAll ! =null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                // The consumer load balancing strategy can be relatively stable only after sorting
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                // MessageQueue load policy, there are five implementation classes
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    // Allocated according to the load policy, returns the collection of MessageQueue actually subscribed by the current consumer
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                        e);
                    return; }... }}Copy the code

Allocate can be implemented in a variety of ways.

  • Org. Apache. Rocketmq. Client. Consumer. Rebalance. AllocateMachineRoomNearby# the allocate: close to the engine room strategy

Org. Apache. Rocketmq. Client. Consumer. Rebalance. AllocateMessageQueueAveragely# the allocate: the average distribution of mq to each consumer instance – average allocation strategy: as shown in figure

Org. Apache. Rocketmq. Client. Consumer. Rebalance. AllocateMessageQueueAveragelyByCircle# the allocate: Assign MQ to each consumer instance one by one, and then start the next cycle – the circular allocation strategy is shown below:

Org. Apache. Rocketmq. Client. Consumer. Rebalance. AllocateMessageQueueByConfig# the allocate: according to the configuration file, specify the distribution – “manual configuration strategy

Org. Apache. Rocketmq. Client. Consumer. Rebalance. AllocateMessageQueueByMachineRoom# the allocate: give priority to assign mq consumer instance – in the same room, computer room with strategy As shown in figure

Org. Apache. Rocketmq. Client. Consumer. Rebalance. AllocateMessageQueueConsistentHash# the allocate: consistency hash allocation policy

(ToDo: Detailed analysis, lag)

Consumer pull the message

Start the thread and process the request

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

while (!this.isStopped()) {
    try {
        // Pull the message queue request
        PullRequest pullRequest = this.pullRequestQueue.take();
        // Process the request
        this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
        log.error("Pull Message Service Run Method exception", e); }}Copy the code

Pull message implementation

org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if(consumer ! =null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        // ToDo: k2-> Push mode will eventually use pull messages
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }}Copy the code

It can be seen that the push mode still uses the pull mode to consume messages, thus it can also be concluded that the push mode is the encapsulation of the pull mode

The core process of pulling messages

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

Part of the code is as follows:

.// Get the message to process: ProcessQueue
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// If the queue is discarded, return directly
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// Update the timestamp
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
    // Verify that the Consumer is normal
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    return;
}

// If the queue is suspended, delay execution for 1s
if (this.isPause()) {
    log.warn("consumer was paused, execute pull request later. instanceName={}, group={}".this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
    return;
}
// Get the maximum number of messages to be processed
long cachedMessageCount = processQueue.getMsgCount().get();
// Get the maximum size of messages to be processed
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// Perform flow control quantitatively
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { ... }

// Flow control from message size
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { ... }


// Messages are consumed sequentially or delayedly
if (!this.consumeOrderly) { ... }else{... }// ToDo: k2-> The client's default pull callback, after the pull message, will enter this method processing
// onSuccess
PullCallback pullCallback = new PullCallback() { 
    @Override
    public void onSuccess(PullResult pullResult) {... }}// ToDo: where the K2 client actually interacts with the server to pull messages
this.pullAPIWrapper.pullKernelImpl( ... )
Copy the code

Consumer pull message implementation

// ToDo: k2-> consumer pull message public PullResult pullKernelImpl(...) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// First findBroker FindBrokerResult FindBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); . // check version check if (! ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); PullMessageRequestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } / / pull news PullResult PullResult = this. MQClientFactory. GetMQClientAPIImpl () pullMessage (brokerAddr requestHeader, timeoutMillis, communicationMode, pullCallback); }Copy the code

Asynchronously pull messages

// Asynchronously pull the message
private void pullMessageAsync(
    final String addr,
    final RemotingCommand request,
    final long timeoutMillis,
    final PullCallback pullCallback
) throws RemotingException, InterruptedException {
    // asynchronous pull
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();
            / / a response
            if(response ! =null) {
                try {
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                    assertpullResult ! =null;
                    // Here is the callback to the onSuccess implemented in the code abovepullCallback.onSuccess(pullResult); .Copy the code