RocketMQ source code interpretation – the same consumer group under different consumer subscriptions

@(RocketMQ source code interpretation)


To clarify the topic, we assume that there is a Producer and two consumers. The Producer sends messages to TOPICA and TOPICB, and the two consumers subscribe to two topics. Let’s take a look at the problems that can occur in this case and analyze the source code to see why.

The phenomenon of

The consumer’s subscription not exist, group… “(The Consumer side prints a similar log).

There will also be some subscription changed, group:… A similar log, and if you look carefully, you can see that when one consumer consumes a message, the other does not.

Source code analysis

Let’s take a look at why this might be the case. It’s hard to get started with live viewing or debugging, and you might need to use the killer trick — ask.

Google, Baidu, Bing. I directly asked a god – Taro. Ohami said that this was a problem, for reasons he couldn’t remember, but that the problem was that the consumer relationship kept overlaying each other.

Now that we know where to start, at least from the Broker.

Follow the trail to find the reason, look at the source code.

First, we know that both implementations of the consumer (push and pull) maintain an MQClientInstance. This class is very important. When the consumer is launched, this class is launched.

// Start various schedule tasks
this.startScheduledTask();
Copy the code

Here are a number of scheduled tasks launched, let’s catch up to take a look:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run(a) {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            // Send heartbeat periodically
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); }}},1000.this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
Copy the code

Here we see that the consumer periodically sends a heartbeat to the Broker, and we continue to chase after it until we find the sendHeartbeatToAllBroker method:

// Send heartbeat to all brokers
if (!this.brokerAddrTable.isEmpty()) {
    long times = this.sendHeartbeatTimesTotal.getAndIncrement();
    Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, HashMap<Long, String>> entry = it.next();
        String brokerName = entry.getKey();
        HashMap<Long, String> oneTable = entry.getValue();
        if(oneTable ! =null) {
            for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                Long id = entry1.getKey();
                String addr = entry1.getValue();
                if(addr ! =null) {
                    if (consumerEmpty) {
                        if(id ! = MixAll.MASTER_ID)continue;
                    }

                    try {
	                    // The part that actually sends the heartbeat
                        int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                        if (!this.brokerVersionTable.containsKey(brokerName)) {
                            this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                        }
                        this.brokerVersionTable.get(brokerName).put(addr, version);
                        if (times % 20= =0) {
                            log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); }}catch (Exception e) {
                        if (this.isBrokerInNameServer(addr)) {
                            log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                        } else {
                            log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                id, addr);
                        }
                    }
                }
            }
        }
    }
}
Copy the code

This sends heartbeat messages to all brokers. In our example, the Broker is one. We go to the Broker to see how it handles heartbeat messages. This type of message is processed using ClientManageProcessor. Let’s look at the part that handles the heartBeat (heartBeat method) :

// Loop through all the data sent
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
	// Get the consumption message recorded on the broker based on the name of the consumption group
    SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
    boolean isNotifyConsumerIdsChangedEnable = true;
    if (null! = subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;
        if (data.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false.true);
        }
        String newTopic = MixAll.getRetryTopic(data.getGroupName());
        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
            newTopic,
            subscriptionGroupConfig.getRetryQueueNums(),
            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    }
	// Registered consumer
    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
        data.getGroupName(),
        clientChannelInfo,
        data.getConsumeType(),
        data.getMessageModel(),
        data.getConsumeFromWhere(),
        data.getSubscriptionDataSet(),
        isNotifyConsumerIdsChangedEnable
    );

    if (changed) {
        log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); }}Copy the code

As you can see, the broker gets its own record of consumer subscriptions based on the message sent by the consumer. Note that it gets the information by consumer group. Let’s look at registerConsumer:

// Get consumer information based on consumer groups
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
    ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
	// Notice that the key of consumerTable is group
    ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev ! =null ? prev : tmp;
}
boolean r1 =
    consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
        consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
    if (isNotifyConsumerIdsChangedEnable) {
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); }}this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
Copy the code

We note that ConsumerGroupInfo prev = this. ConsumerTable. PutIfAbsent (group, TMP); This sentence tells us that the consumer information stored in the consumerTable is based on the consumption group. If the consumption information of a group is different, as in our example, the consumer heartbeat information of the subscribed TOPICA group tells the Broker that our group subscribed TOPICA! The Broker then records it. Broker: We subscribe TOPICB!

This results in subscription messages overwriting each other, so when the message is pulled, one of the consumers must not be farmed to the message because the subscription information is not available on the Broker.

So we know what’s causing this phenomenon.