One, the introduction

RocketMQ is an excellent distributed messaging middleware with better performance in all aspects than existing message queues. RocketMQ uses the long polling pull mode by default and supports tens of millions of messages stacked on a single machine, which can be used in massive messaging systems.

RocketMQ consists of Producer, Broker, Consumer, and Namesvr. Producer produces messages, Consumer consumes messages, Broker stores messages, and Namesvr stores metadata. The main functions of each component are as follows:

  • Message producers are responsible for producing messages. Business systems are generally responsible for producing messages. A message producer sends messages generated in the business application to the Broker server. RocketMQ provides multiple delivery modes: synchronous, asynchronous, sequential, and unidirectional. Both synchronous and asynchronous require the Broker to return an acknowledgement message, but one-way does not.

  • Message Consumer: Is responsible for consuming messages, usually the backend system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user application, it provides two forms of consumption: pull consumption and push consumption.

  • Broker Server: A role that stores and forwards messages. The proxy server in the RocketMQ system is responsible for receiving and storing messages sent from producers and preparing consumers for pull requests. The proxy server also stores message-related metadata, including consumer groups, consumption progress offsets, and topic and queue messages.

  • Name Service (Name Server) : The Name service acts as a provider of routed messages. The producer or consumer can use the name service to find the corresponding list of Broker IP addresses for each topic. Multiple Namesrv instances form a cluster, but are independent of each other and do not exchange information.

  • Producer Group: A Group of producers of the same kind who send the same kind of messages with the same logic. If a transaction message is sent and the original producer crashes after sending, the Broker server contacts other producer instances in the same producer group to commit or backtrack consumption.

  • Consumer Group: A collection of consumers of the same type, usually consuming the same type of messages with consistent consumption logic. Consumer groups make it easy to achieve the goals of load balancing and fault tolerance in terms of message consumption.

RocketMQ overall message processing is logically produced and consumed in the Topic dimension and physically stored in a MessageQueue on a specific Broker. Because a Topic has multiple MessageQueue on multiple Broker nodes, This naturally creates a load balancing requirement for message production and consumption.

The core of this article is to explain how RocketMQ’s producers and consumers implement load balancing during the entire message production and consumption process and the implementation details.

The overall architecture of RocketMQ

(Image from Apache RocketMQ)

RocketMQ is architecturally divided into four main parts, as shown in the figure above:

  • Producer: a role that publishes messages and supports distributed cluster deployment. The Producer uses MQ’s load balancing module to select the corresponding Broker cluster queue for message delivery, which supports fast failure and low latency.

  • Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. At the same time, it also supports the consumption of cluster mode and broadcast mode. It provides real-time message subscription mechanism, which can meet the needs of most users.

  • NameServer: NameServer is a very simple Topic routing registry that supports distributed cluster deployment. It acts like ZooKeeper in Dubbo and supports dynamic Broker registration and discovery.

  • BrokerServer: The Broker is primarily responsible for storing, Posting, and querying messages, as well as ensuring high availability of services, and supports distributed cluster deployment.

The physical distribution of RocketMQ’s topics is shown in the figure above:

A Topic serves as a logical concept for message production and consumption, with specific message stores distributed among different brokers.

A Queue in a Broker is the physical storage unit for messages corresponding to a Topic.

In the overall design concept of RocketMQ, messages are produced and consumed on a Topic scale, and each Topic creates a corresponding MessageQueue at Broker nodes in the RocketMQ cluster.

The process of producing a message is essentially to select all the MessageQueue of the Topic Broker and select one of them according to certain rules to send the message. The normal strategy is polling.

In essence, the process of consumer consuming messages is that each consumer in the same consumer group subscribles to the same Topic is responsible for consuming part of the MessageQueue under the Topic according to certain rules.

Throughout the life cycle of RocketMQ messages, the concept of load balancing is involved in both the production and consumption of messages. The generation of messages involves the selection of load balancing brokers, and the consumption of messages involves the responsible balancing of multiple consumers and brokers.

The information production process is being produced

The production process:

  • Producer first accesses NamesVR to obtain routing information, and NamesVR stores all routing information at the Topic level (including the queue distribution for each Topic at each Broker).

  • The producer parses the routing information to generate local routing information, parses the Topic on the Broker queue, and translates the routing information into local message production.

  • Producer sends messages to brokers based on the local route information, and selects the specific brokers in the local route to send messages.

3.1 Route synchronization Process

public class MQClientInstance {
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false.null);
    }
 
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if(isDefault && defaultMQProducer ! =null) {
                        // omit the corresponding code
                    } else {
                        // 1. Query routing information corresponding to the specified Topic
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if(topicRouteData ! =null) {
                        // 2. Compare whether topicRouteData is changed
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if(! changed) { changed =this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        // 3. Parses the routing information into producer routing information and consumer routing information
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // Generate Topic information corresponding to the producer
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if(impl ! =null) { impl.updateTopicPublishInfo(topic, publishInfo); }}}// Save to the local producer routing table
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true; }}}finally {
                    this.lockNamesrv.unlock(); }}else{}}catch (InterruptedException e) {
        }
 
        return false; }}Copy the code

Route synchronization process:

  • Route synchronization is a prerequisite for message producers to send messages. Without route synchronization, they cannot sense which Broker node is sent to.

  • Route synchronization queries routing information corresponding to a specific Topic, compares whether topicRouteData is changed, and finally parses the routing information and converts it into routing information of producers and consumers.

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // Queue information stored according to the broker dimension
    private List<QueueData> queueDatas;
    // Broker information stored in the broker dimension
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
 
 
public class QueueData implements Comparable<QueueData> {
    // Name of the broker
    private String brokerName;
    // Read queue size
    private int readQueueNums;
    // Write queue size
    private int writeQueueNums;
    // Read and write permissions
    private int perm;
    private int topicSynFlag;
}
 
 
public class BrokerData implements Comparable<BrokerData> {
    // Information about the cluster to which the broker belongs
    private String cluster;
    // Name of the broker
    private String brokerName;
    // The IP address of the broker
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = newRandom(); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // The finest-grained queue information
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}
 
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    / / the Topic information
    private String topic;
    // Owning brokerName information
    private String brokerName;
    // Queue information Id under Topic
    private int queueId;
}
Copy the code

Route resolution process:

  • The TopicRouteData core variable QueueData holds the queue information for each Broker, and BrokerData holds the address information for the Broker.

  • The TopicPublishInfo core variable MessageQueue holds the most fine-grained queue information.

  • The Producer is responsible for converting the TopicRouteData obtained from NamesVR into the Producer’s local TopicPublishInfo.

public class MQClientInstance {
 
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
 
        TopicPublishInfo info = new TopicPublishInfo();
 
        info.setTopicRouteData(route);
        if(route.getOrderTopicConf() ! =null && route.getOrderTopicConf().length() > 0) {
          // omit the relevant code
        } else {
 
            List<QueueData> qds = route.getQueueDatas();
 
            // Sort according to brokerName
            Collections.sort(qds);
 
            // Traverses all brokers to generate queue dimension information
            for (QueueData qd : qds) {
                // QueueData with write capability can be used for queue generation
                if (PermName.isWriteable(qd.getPerm())) {
                    // Traverse to get the specified brokerData for exception condition filtering
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break; }}if (null == brokerData) {
                        continue;
                    }
                    if(! brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;
                    }
 
                    // Iterate over the size of QueueData's write queue and generate MessageQueue to save TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }
 
            info.setOrderTopic(false);
        }
 
        returninfo; }}Copy the code

Route generation process:

  • The route generation process primarily generates the MessageQueue object from BrokerName and writeQueueNums for QueueData.

  • MessageQueue is the most fine-grained queue that can be sent during message delivery.

{
    "TBW102": [{
        "brokerName": "broker-a"."perm": 7."readQueueNums": 8."topicSynFlag": 0."writeQueueNums": 8
    }, {
        "brokerName": "broker-b"."perm": 7."readQueueNums": 8."topicSynFlag": 0."writeQueueNums": 8}}]Copy the code

Example for route resolution:

  • Topic (TBW102) has queue information on broker-A and broker-B, where the number of read and write queues is 8.

  • Sort broker information by broker-a and broker-b names.

  • Eight MessageQueue objects of topic TBW102 are generated for broker-A, with queueId 0-7.

  • Eight MessageQueue objects with topic TBW102 are generated for broker-B, with queueId 0-7.

  • TopicPublishInfo for topic (named TBW102) contains a total of 16 MessageQueue objects, including 8 MessageQueue for Broker-A and 8 MessageQueue for Broker-B.

  • Routing during message sending is to obtain one of the 16 MessageQueue objects for message sending.

3.2 Load Balancing Process

public class DefaultMQProducerImpl implements MQProducerInner {
 
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        
        TopicPublishInfo = TopicPublishInfo = TopicPublishInfo
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 
        if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
             
            String[] brokersSent = new String[timesTotal];
            // The message is sent based on the number of retries
            for (; times < timesTotal; times++) {
                // Record the last brokerName that failed to be sent
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                TopicPublishInfo = TopicPublishInfo ()
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if(mqSelected ! =null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 3. Perform sending and determine the sending result. If the sending fails, select the message queue for resending the message based on the retry times
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        switch (communicationMode) {
                            case SYNC:
                                if(sendResult.getSendStatus() ! = SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue; }}return sendResult;
                            default:
                                break; }}catch (MQBrokerException e) {
                        // omit the relevant code
                    } catch (InterruptedException e) {
                        // omit the relevant code}}else {
                    break; }}if(sendResult ! =null) {
                returnsendResult; }}}}Copy the code

Message sending process:

  • Query TopicPublishInfo, the routing information object for Topic.

  • Get the queue that sends the message from TopicPublishInfo via selectOneMessageQueue, which represents the queue of the specific Broker.

  • Perform the sending and determine the sending result. If the sending fails, select the message queue based on the number of retries. Selecting the message queue again will avoid the queue of the Broker that failed to send the message last time.

public class TopicPublishInfo {
 
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            // Select MessageQueue to send according to polling
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // Avoid sending MessageQueue that failed last time
                if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}returnselectOneMessageQueue(); }}}Copy the code

Routing process:

  • MessageQueue is selected according to polling, and the sending queue is selected by summing up the global maintenance index.

  • The selection of MessageQueue will avoid the MessageQueue corresponding to the last failed Broker.

Schematic diagram of sending messages from Producer:

  • The queue distribution for a Topic is Broker_A_Queue1, Broker_A_Queue2, Broker_B_Queue1, Broker_B_Queue2, Broker_C_Queue1, Broker_C_Queue2, Select them in turn according to the polling policy.

  • Broker_A_Queue1 fails to send and Broker_A is skipped and Broker_B_Queue1 is selected to send.

The process of consumption

The process of consumption:

  • The consumer accesses the routing information corresponding to the NamesVR synchronization topic.

  • The consumer parses the remote routing information locally and saves it locally.

  • The consumer performs Reblance load balancing locally to determine the MessageQueue responsible for consumption of this node.

  • The consumer accesses the Broker to consume messages for the specified MessageQueue.

4.1 Route Synchronization Process

public class MQClientInstance {
 
    // 1. Start a scheduled task to synchronize routing information from NamesVR
    private void startScheduledTask(a) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run(a) {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    }
 
    public void updateTopicRouteInfoFromNameServer(a) {
        Set<String> topicList = new HashSet<String>();
 
        // Iterate over all consumer subscribed topics and get routing information from NamesVR
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if(impl ! =null) {
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if(subList ! =null) {
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }
 
        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic); }}public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
 
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if(isDefault && defaultMQProducer ! =null) {
                        // omit the code
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if(topicRouteData ! =null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if(! changed) { changed =this.isNeedUpdateTopicRouteInfo(topic);
                        }
 
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // Build routing information for consumer
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if(impl ! =null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); }}}this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true; }}}finally {
                    this.lockNamesrv.unlock(); }}}catch (InterruptedException e) {
        }
 
        return false; }}Copy the code

Route synchronization process:

  • The routing synchronization process is a prerequisite for message consumers to consume messages. Without routing synchronization, Broker nodes cannot be aware of specific messages to be consumed.

  • The consumer node periodically synchronizes routing information from NamesVR for topics subscribed to by the consumer node through a scheduled task.

  • The Consumer uses updateTopicSubscribeInfo to build the synchronized routing information into the local routing information and use it for subsequent responsible balancing.

4.2 Load Balancing Process

public class RebalanceService extends ServiceThread {
 
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval"."20000"));
 
    private final MQClientInstance mqClientFactory;
 
    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }
 
    @Override
    public void run(a) {
 
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance(); }}}Copy the code

Load balancing process:

  • The consumer uses the RebalanceService to periodically rebalance the load.

  • The core of RebalanceService is to complete the assignment relationship between MessageQueue and consumer.

public abstract class RebalanceImpl {
 
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // omit the relevant code
                break;
            }
            case CLUSTERING: { // Load balancing in cluster mode
                < span style = "max-width: 100%
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 
                Get all the consumer objects under the topic consumerGroup
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                 
                Rebalance. Rebalance
                if(mqSet ! =null&& cidAll ! =null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
 
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
 
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 4. Reassign using the allocation policy
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
 
                        return;
                    }
 
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if(allocateResult ! =null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    Rebalance based on the assignment
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet); }}break;
            }
            default:
                break; }}Copy the code

Redistribution process:

  • Get all MessageQueue under topic.

  • Gets the CID of all consumers under this consumerGroup under topic (for example, 192.168.0.8@15958).

  • Sort for mqAll and cidAll, with mqAll sorted by BrokerName first and then BrokerId, and cidAll sorted by string.

  • By allocation strategy

  • AllocateMessageQueueStrategy redistribution.

  • Perform the real rebalance action based on the assignment.

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List
       
         mqAll, List
        
          cidAll)
        
        {
         
        List<MessageQueue> result = new ArrayList<MessageQueue>();
         
        // The core logic calculation begins
 
        // Calculate the subscript of the current CID
        int index = cidAll.indexOf(currentCID);
         
        // Calculate the extra modules
        int mod = mqAll.size() % cidAll.size();
 
        // Calculate the average size
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // Calculate the starting subscript
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // Calculate the size of the range
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        // Result of assembly
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    // The core logic calculation is complete
 
    @Override
    public String getName(a) {
        return "AVG"; }} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- rocketMq cluster exists3Broker_a, Broker_B, and broker_C. RocketMq has a topic named topic_demo with writeQueue number of write queues3, located in3A broker. The sorted mqAll has a size of9For [broker_A_0 broker_A_1 broker_A_2 broker_B_0 broker_B_1 broker_C_0 broker_C_1 broker_C_2] rocketMq contains4The cidAll of the consumer group is [192.168. 06.@15956  192.168. 07.@15957  192.168. 08.@15958  192.168. 09.@15959]
 
192.168. 06.@15956Allocate MessageQueue settlement process index:0Mod:9%4=1AverageSize:9 / 4 + 1 = 3StartIndex:0The range:3MessageQueue: [broker_A_0, broker_A_1, broker_a_2]192.168. 06.@15957Allocate MessageQueue settlement process index:1Mod:9%4=1AverageSize:9 / 4 = 2StartIndex:3The range:2MessageQueue: [broker_b_0, broker_B_1]192.168. 06.@15958Allocate MessageQueue settlement process index:2Mod:9%4=1AverageSize:9 / 4 = 2StartIndex:5The range:2MessageQueue: [broker_b_2, broker_C_0]192.168. 06.@15959Allocate MessageQueue settlement process index:3Mod:9%4=1AverageSize:9 / 4 = 2StartIndex:7The range:2MessageQueue: [broker_C_1, broker_C_2]Copy the code

Allocation strategy analysis:

  • The overall allocation strategy can be better understood by referring to the specific example in the figure above.

Distribution of consumers:

  • Consumer objects under the same consumerGroup are assigned to different MessageQueue under the same Topic.

  • Each MessageQueue is eventually assigned to a specific consumer.

RocketMQ specifies machine consumption design ideas

In the daily test environment, there will be multiple consumers to consume, but in the actual development, a consumer with a new function hopes that the message will only be consumed by this machine for logical coverage. In this case, the cluster mode of consumerGroup will cause us trouble. It is uncertain which consumer consumes the message because of consumption load balancing. Of course, we can implement targeted machine consumption by introducing consumer load balancing.

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
 
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List
       
         mqAll, List
        
          cidAll)
        
        {
        
 
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // By rewriting this part of the logic, add to determine whether the machine is the specified IP, if not directly returned empty list indicates that the machine is not responsible for consumption
        if(! cidAll.contains(currentCID)) {return result;
        }
 
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        returnresult; }}Copy the code

Consumer load balancing strategy rewrite:

  • By rewriting the load balancing strategy AllocateMessageQueueAveragely the allocate mechanism to ensure that only the specified IP machines capable of consumption.

  • The CID format of RocketMQ based on IP is 192.168.0.6@15956, where the IP address in front is the IP address of the consumer machine. The whole scheme is feasible and can actually be implemented.

Six, the summary

This article mainly introduces the RocketMQ load balancing mechanism in the production and consumption process, combined with the source code and practical cases to give readers an easy to understand the popularization of technology, hope to provide readers with reference and reference value. Limited by the length of the article, some aspects are not involved, there are also a lot of technical details are not elaborated, if you have any questions, welcome to continue to exchange.

Author: Wang Zhi, Vivo Internet Server Team