preface

Those of you who have used MQ may have encountered the problem of message accumulation. And the fat ditch recently stepped in the hole, but it turned out to be the result of such an unexpected cause.

The body of the

That night dark wind high, fat trench is preparing to set foot on the way home, suddenly received alarm message bombing! “MQ Message Accumulation Alarm [TOPIC: XXX]”

Fat in the heart “ten thousand grass mud horse collapse teng ~” the first reaction is: “how fat matter? Coming right after work to do something…?”

So I went back to my office, turned on my computer, and got on RocketMQ.

Conquer the grass (ใ‚š ะด ใ‚šยด)!! Over 300 million messages??

To be aware of the problem of message accumulation:

The production speed of the producer >> the processing speed of the consumer

  1. A sudden increase in the rate of production of a producer, such as a sudden increase in the flow of producers
  2. The consumption speed slows down, for example, the I/O of the consumer instance is severely blocked or down

Wipe the cold sweat on my head ๐Ÿ˜“… Get on the consumer server and check it out.

The application is working fine! Server disk I/O normal! Network working!

Go to the producer’s server again, ew… Flow is also normal!

What?? Buddha ๐Ÿ˜จ… Both producer and consumer applications are normal, but why are messages piling up so much? As I watched the pile pile up (if only it was the amount of my hair), I became more and more worried.

Although RocketMQ can support a billion message heap without significantly degrading performance due to message heap, ๐Ÿ˜ฐ this heap is clearly an anomaly.

RocketMQ is buggy, yes this is definitely RocketMQ’s pot!

After this…

Ha ha back to business, although fat trench spell dad no, but at least can’t pit dad ๐Ÿ˜‚

Enter the consumer project to view the log, EMMM… No error found, no error log… It looks as if everything is normal.

Yi… But isn’t the pace of consumption a bit slow? This is not scientific ah, consumer but configured the consumption cluster of 3 nodes ah, according to the demand of business consumption ability but tremendous ah. Let me click on the consumer information of this TOPIC again

Why, how can the ClientId of these three consumers be the same?

“Is it because of the same problem of ClientId that brokers are confused when distributing messages and messages cannot be pushed to consumers? Since both producers and consumers are behaving normally, I suspect that the problem may lie with the Broker.

Based on this assumption, we need to solve the following problems:

  1. Why is ClientId the same for two consumers deployed on different servers?
  2. Will the same ClientId cause broker message distribution errors?

Problem analysis

Why is ClientId the same? I guess it’s because of the Docker container. Because the company recently started the containerization phase, and it just so happens that the consumer’s project is on the first list of containerization phases.

When the Docker process starts, a virtual bridge named Docker0 is created on the host. The Docker container on the host is connected to the virtual bridge. A virtual bridge works like a physical switch, so that all containers on a host are connected to a layer 2 network through the switch. There are generally four network modes of Docker:

  • Host mode
  • Container pattern
  • None mode
  • Bridge pattern

The students who are not clear to these patterns find baidu ๐Ÿค” by themselves

Our containers are in Host mode, so the network of the container is exactly the same as that of the Host.

The default IP address of the docker0 network adapter is 172.17.0.1. Therefore, it is obvious that ClientId should read the IP of the Docker0 card, which explains why ClientId is consistent across consumer terminals.

So the next thing is where is clientId set up? I searched for “Docker” on Github Issues, and sure enough, I did! Still, there are many like-minded people who have stepped on the previous pit, screened and found a relatively reliable open issue

As you can see, this brother is in exactly the same situation as mine, and his conclusion is pretty much the same as what I guessed above (with a bit of self-satisfaction). He also mentions that clientId is defined in the buildMQClientId method in the ClientConfig class.

The source code to explore

Enter the ClientConfig class and navigate to the buildMQClientId method

public String buildMQClientId(a) {
  StringBuilder sb = new StringBuilder();
  sb.append(this.getClientIP());

  sb.append("@");
  sb.append(this.getInstanceName());
  if(! UtilAll.isBlank(this.unitName)) {
    sb.append("@");
    sb.append(this.unitName);
  }

  return sb.toString();
}
Copy the code

ClientId is the client IP + “@”+ instance name. It is obvious that the problem is getting the client IP.

How does it get the client IP

public class ClientConfig {...privateString clientIP = RemotingUtil.getLocalAddress(); . }public static String getLocalAddress(a) {
  try {
    // Traversal Network interface to get the first non-loopback and non-private address
    Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
    ArrayList<String> ipv4Result = new ArrayList<String>();
    ArrayList<String> ipv6Result = new ArrayList<String>();
    while (enumeration.hasMoreElements()) {
      final NetworkInterface networkInterface = enumeration.nextElement();
      final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
      while (en.hasMoreElements()) {
        final InetAddress address = en.nextElement();
        if(! address.isLoopbackAddress()) {if (address instanceof Inet6Address) {
            ipv6Result.add(normalizeHostAddress(address));
          } else{ ipv4Result.add(normalizeHostAddress(address)); }}}}// prefer ipv4
    if(! ipv4Result.isEmpty()) {for (String ip : ipv4Result) {
        if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
          continue;
        }

        return ip;
      }

      return ipv4Result.get(ipv4Result.size() - 1);
    } else if(! ipv6Result.isEmpty()) {return ipv6Result.get(0);
    }
    //If failed to find,fall back to localhost
    final InetAddress localHost = InetAddress.getLocalHost();
    return normalizeHostAddress(localHost);
  } catch (Exception e) {
    log.error("Failed to obtain local address", e);
  }

  return null;
}
Copy the code

If you have to use for current machine IP friend, should be to RemotingUtil. GetLocalAddress () method of the tool is no stranger to

To put it simply, it is to obtain the IP address of the network card of the current machine, but because the network mode of the container adopts host mode, which means that each container and the host are in the same network, so we can also see the Docker 0 network card created by the Docker-server in the container. So it reads 172.17.0.1, the default IP address of the Docker 0 nic

(I have communicated with the operation and maintenance students. At present, as it is the first stage of containerization, the simple mode will be adopted at first, and then it will be gradually replaced with K8S. Each POD has its own independent IP, and the network will be isolated from the host computer and other PODS. emmm…. K8s! That sounds awesome, and I happen to be reading about it.)

** At this point, you might be wise enough to ask, “How can this be the same if there isn’t an instance name argument?” ** Don’t worry, we continue to read ๐Ÿ‘‡

private String instanceName = System.getProperty("rocketmq.client.name"."DEFAULT");

public String getInstanceName(a) {
  return instanceName;
}

public void setInstanceName(String instanceName) {
  this.instanceName = instanceName;
}

public void changeInstanceNameToPID(a) {
  if (this.instanceName.equals("DEFAULT")) {
    this.instanceName = String.valueOf(UtilAll.getPid()); }}Copy the code

The getInstanceName() method actually gets the value of instanceName, but when was it assigned? That’s right, it’s assigned by the changeInstanceNameToPID() method, which is called when the consumer starts.

The logic of this parameter is very simple. During initialization, the environment variable rocketmq.client.name is first checked to see if it has a value. If not, the DEFAULT value is used.

The consumer will then start to see if the value is DEFAULT and invoke utilall.getpid () if so.

public static int getPid(a) {
    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    String name = runtime.getName(); // format: "pid@hostname"
    try {
        return Integer.parseInt(name.substring(0, name.indexOf(The '@')));
    } catch (Exception e) {
        return -1; }}Copy the code

It is clear from the method name that this method actually gets the process number. That… Why do I get the same process number?

Smart you can already know the answer right ๐Ÿคจ! Here are the three main features of Docker

  • cgroup
  • namespace
  • unionFS

Yes, the namespace technology is used here.

The Linux Namespace is a function provided by the Linux kernel to isolate system resources, such as PID, User ID, and Network.

Since they all use the same base image and run the same JAVA project in the outermost layer, we can go inside the container and see that they all have the process number 9

After a series of clever reasoning and argumentation by Feihao, the same clientId will be generated in Docker container HOST network mode!

So far, we’ve solved the first problem!

Following in conan’s footsteps, we continue with the second question: Does clientId similarity cause brokers to distribute messages incorrectly?

During load balancing, Consumer should be uniquely identified by clientId as the client Consumer. The consistency of clientId during message delivery leads to load distribution errors.

So let’s explore how Consumer load balancing works. Initially, I thought that the load balancing on the Consumer side was handled by the Broker, which assigned queues to different consumers based on the number of registered consumers. However, after looking at the doc description on the source code and doing some research on the source code, I found that MY knowledge is still too little (ha ha ha, there should be some friends who think the same as me at the beginning).

Just to fill in the overall architecture of RocketMQ

Due to lack of space, I will only explain the relationship between Broker and consumer. If you don’t understand any of the roles, please refer to my previous RocketMQ introduction article

  1. The Consumer establishes a long connection to one of the nodes in the NameServer cluster (selected at random) and periodically gets Topic routing information from NameServer.
  2. Establish a long connection to the Broker based on the captured Topic routing information, and periodically send heartbeats to the Broker.

When the Broker receives a heartbeat message, the Consumer information is saved to the local cache variable consumerTable. The figure above provides an overview of consumerTable’s storage structure and content, most importantly, it caches each consumer’s clientId.

For the consumption pattern of Consumer, I directly quote the source code explanation

In RocketMQ, the two Consumer consumption modes (Push/Pull) are based on the Pull mode for retrieving messages. The Push mode is an encapsulation of the Pull mode. Essentially, the message pulling thread pulls a batch of messages from the server. It then commits to the message consuming thread pool and continues to try to pull messages from the server again. If the message is not pulled, the pull is delayed and continues.

In both Push/Pull consumption modes, the Consumer needs to know which message queue – queue – to fetch messages from the Broker. Therefore, it is necessary to do load balancing on the Consumer side, that is, to allocate multiple MessageQueue on the Broker side to which consumers in the same ConsumerGroup consume.

So simply put, in both Push and Pull modes, control of message consumption resides with the Consumer, so Consumer’s load balancing is implemented on the Consumer’s Client.

RebalanceService completes the load balancing service thread (every 20 seconds). The run() method of the RebalanceService thread ends up calling the rebalanceByTopic() method of the RebalanceImpl class, which is the core of the Consumer side load balancing implementation. Here, the rebalanceByTopic() method does different logic depending on whether the consumer communication type is “broadcast mode” or “cluster mode.” Here are the main processing flows in cluster mode:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
  switch (messageModel) {
    case BROADCASTING: {
      ..... / / to omit
    }
    case CLUSTERING: {
      // Get the collection of message consumption queues under this Topic
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      // Get the consumer's clientId from the broker
      List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      if (null == mqSet) {
        if(! topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); }}if (null == cidAll) {
        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
      }

      if(mqSet ! =null&& cidAll ! =null) {
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll.addAll(mqSet);

        Collections.sort(mqAll);
        Collections.sort(cidAll);
        // The default average allocation algorithm
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

        List<MessageQueue> allocateResult = null;
        try {
          allocateResult = strategy.allocate(
            this.consumerGroup,
            this.mQClientFactory.getClientId(),
            mqAll,
            cidAll);
        } catch (Throwable e) {
          log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                    e);
          return;
        }

        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
        if(allocateResult ! =null) {
          allocateResultSet.addAll(allocateResult);
        }

        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        if (changed) {
          log.info(
            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
            allocateResultSet.size(), allocateResultSet);
          this.messageQueueChanged(topic, mqSet, allocateResultSet); }}break;
    }
    default:
      break; }}Copy the code

(1) Obtain the message consumption queue set (mqSet) under this Topic from the local cache variable topicSubscribeInfoTable;

(2) Call findConsumerIdList() to send the list of clientId to the Broker based on the topic and consumerGroup arguments;

(3) First sort the message consumption queue and consumer Id under Topic, and then use the message queue allocation strategy algorithm (default: message queue average allocation algorithm) to calculate the message queue to be pulled. The average allocation algorithm here, similar to the paging algorithm, sorts all MessageQueue in order similar to the record, sorts all consumers in order similar to the page number, and calculates the average size that each page needs to contain and the range of records on each page. Finally, the entire range is traversed to calculate which records should be allocated to the current Consumer (in this case, MessageQueue).

(4) then call updateProcessQueueTableInRebalance () method, the specific approach is to first set will be assigned to the message queue (mqSet) and processQueueTable do a filtering ratio.

  • The processQueueTable annotation in red in the figure above indicates that it is not included in the allocated message queue set mqSet. Set these queues to true and see if they can be removed from the processQueueTable cache variableremoveUnnecessaryMessageQueue()Method that checks every 1s to see if the lock on the current consumption-processing queue can be acquired, and returns true if so. Return false if the lock on the current consumer processing queue is still not available after 1s of waiting. If true is returned, the corresponding Entry is removed from the processQueueTable cache variable;
  • The green part of processQueueTable in the figure above represents the intersection with the allocated message queue set mqSet. Determine if the ProcessQueue has expired. If the ProcessQueue is in Pull mode, set Dropped to true, and call ProcessQueueremoveUnnecessaryMessageQueue()Method, try to remove Entry as above;

Message consumption queue load balancing among different consumers in the same consumer group, its core design concept is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.

The above part is taken from RocketMQ source docs. I don’t know if you understand it, but it took me several times to understand ๐Ÿค”๐Ÿค”๐Ÿค”

In fact, the realization of load balancing can be clearly seen from the figure in Step 3. Simply speaking, it is to allocate the same number of consumption queues to different consumers. Consumers will generate the unique identifier of clientId, but according to our reasoning above, consistent clientId will be generated in container and Host network mode.

Emmmm…. By this point, you can probably guess what went wrong.

That’s right! The problem should be in step 3, the calculation of the equal distribution.

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
  if (currentCID == null || currentCID.length() < 1) {
    throw new IllegalArgumentException("currentCID is empty");
  }
  if (mqAll == null || mqAll.isEmpty()) {
    throw new IllegalArgumentException("mqAll is null or mqAll empty");
  }
  if (cidAll == null || cidAll.isEmpty()) {
    throw new IllegalArgumentException("cidAll is null or cidAll empty");
  }

  List<MessageQueue> result = new ArrayList<MessageQueue>();
  if(! cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
             consumerGroup,
             currentCID,
             cidAll);
    return result;
  }
  // The index of the current clientId
  int index = cidAll.indexOf(currentCID);
  int mod = mqAll.size() % cidAll.size();
  int averageSize =
    mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                                         + 1 : mqAll.size() / cidAll.size());
  int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
  int range = Math.min(averageSize, mqAll.size() - startIndex);
  for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
  }
  return result;
}
Copy the code

The above calculation may seem a bit convoluted, but once you understand it, you simply calculate the message queue currently allocated by the Consumer, as shown in Step 3 above

Assuming that there is only one consumer at the moment, our consumption is actually perfectly normal, because all queues under the current Topic are allocated to the current consumer, and there is no load balancing problem.

So let’s say we have two consumers, and the normal way to calculate it would look something like this. However, since cidAll is two duplicate Clientids, both consumers get an index of 0, so naturally they both assign the same MessageQueue. This explains why you can see the log of consumption at the beginning, but the consumption rate is very slow.

The solution

  1. Resolve load balancing errors

Culprit: clientId

After a good inference, you should know that the root cause of the Consumer load balancing error is the consistent clientId generated by the Consumer client, so the key to solve this problem is to modify the generation rules of clientId. We can manually set the rocketmq.client.name environment variable to generate a custom and unique clientId.

Here the time stamp is added after the original PID:

@PostConstruct
public void init(a) {
  System.setProperty("rocketmq.client.name", String.valueOf(UtilAll.getPid()) + "@" + System.currentTimeMillis());
}
Copy the code
  1. Resolving message accumulation

Finally solved the root problem! Okay, everything is ready to go online, and there are 300 million messages in the queue waiting to be consumed.

(It can be said that it is a pile of cool, has been a pile of straight ๐Ÿ˜ญ)

Just launched, emMM… The effect is remarkable, and the number of accumulated messages is gradually reduced. But there’s another alarm, mongodb alarm!

Hold the grass… I had almost forgotten that mongodb could no longer handle the sudden increase in incoming traffic that consumers write to mongodb after processing messages. Fortunately, historical information is not important and can be lost. So fat hao decisively went to the background to reset the consumption point, and now the consumption is normal, mongodb is also normal. Hoo ~ narrowly missed, almost created another accident.

conclusion

  1. Each RocketMQ consumer client generates a unique clientId identifier, which is generated by the following rulesClient IP address + client process ID
  2. If the network mode uses Host mode, the applications in the container will get the default IP address of the Docker bridge
  3. RocketMQ’s consumer side load balancing is implemented on the client side. The consumer client caches the corresponding Topic consumption queue. By default, the average allocation algorithm of message queues is adopted.
  4. Take a thorough look at how messages stack up. The consumption entrance of instant large flow can not affect other businesses, or it will cause another accident just like fat trench (if you have better information accumulation treatment scheme, welcome to leave a message proposal)

This is the first time for feihao to write an article about the online accident. it may be rough in many places or details. we hope you can give us more understanding and suggestions

In fact, I have experienced quite a few online accidents, but every summary is only a mere form. I hope I can sort it out in the form of articles from now on. First, it is helpful for my own summary and review in the future, and it can also provide more pit mining experience for everyone.

Ordinary change, will change ordinary

I am a house xiaonian, a low-key young man in the Internet

Please visit my blog at ๐Ÿ“– edisonz.cn for more shared articles