NameServer introduction

NameServer is a lightweight naming service designed for RocketMQ that is simple, scalable, stateless, and non-communicating between nodes. The Rocketmq cluster works as shown below:

RocketMQ architecture is divided into four parts: Broker, Producer, Consumer, and NameServer. The other three communicate with NameServer:

  • NameServer: A simple Topic routing registry that acts like ZooKeeper in Dubbo and supports dynamic Broker registration and discovery.

It mainly includes two functions:

  1. Broker management :NameServer accepts Broker registration requests and processes the request data as the basis for routing information. Perform a heartbeat detection mechanism on the broker to check whether it is alive (120s).

  2. Topic routing information management: Each NameServer holds routing information for the entire Broker cluster, which is used by Producer and Conumser to deliver and consume messages.

  • Producer: indicates the role of publishing messages, which can be deployed in a cluster. The NameServer cluster is used to obtain routing information for topics, including which queues are located under the Topic, and which brokers and so on. (The Producer only sends messages to the Master, so it only needs to establish a connection with the Master.)

  • Consumer: Role of message consumption, which can be clustered. Routing information for the Topic is obtained through the NameServer cluster, and the corresponding Broker is connected to pull up and consume messages. (Both Master and Slave can pull messages, so the Consumer connects to both.)

  • Broker: Stores, delivers, and queries messages and ensures high availability of services.

Why NameServer?

There are many service discovery components available, such as ETCD, Consul, ZooKeeper, Nacos, etc. :

So why did RocketMQ choose to develop a NameServer instead of using these open source components? Here’s why:

  • RocketMQ’s architecture is designed to reduce overall maintenance costs by requiring only a lightweight metadata server that maintains final consistency, eliminating the need for Zookeeper’s strong consistency solution and relying on another middleware.

  • The Broker registers its routing information with each NameServer. Therefore, each NameServer saves a complete routing information. A single NameServer fails. Brokers can still synchronize routing information with other Nameservers without affecting other Nameservers, so producers and consumers can still dynamically perceive the routing information of brokers.

NameServer internal decryption

The routing data of NameServer is provided by broker registration and processed internally. The routing data of NameServer is used by producer and consumer. Next, we will focus on analyzing the routing data structure of NameServer. Route registration/query, broker dynamic detection of core logic (source). 3.1 Routing Data Structure RouteInfoManager is the core logic class of NameServer. Its code function is to maintain routing information management and provide core functions such as route registration/query. As routing information is stored in NameServer application memory, The ReentrantReadWriteLock read/write lock is added to prevent concurrent operations.

public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); If NameServer does not receive a heartbeat packet from the Broker within 2 minutes, the connection is closed. private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; Private Final ReadWriteLock lock = new ReentrantReadWriteLock(); // Topic, and corresponding queue information - messages are sent with load balancing based on routing tables. private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; Broker base information, including BrokerName, cluster name, and primary/secondary Broker addresses. private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Cluster and a list of brokers belonging to that cluster (based on a cluster name, Get the corresponding list of BrokerNames.) Private Final HashMap<String/* clusterName */, Set<String/* BrokerName */>> clusterAddrTable; // List of living Broker addresses (which NameServer replaces each time it receives a heartbeat packet) Private Final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; Private Final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; . Omit... }Copy the code

You can see the relationship more clearly in the following class diagram:

QueueData attribute parsing:

Public class QueueData implements Comparable<QueueData> {private String brokerName; // Number of read queues Default: 16 private int readQueueNums; // Number of write queues Default: 16 Private int writeQueueNums; Private int perm; private int perm; private int perm; private int perm; / * * synchronous replication or asynchronous replication -- corresponding TopicConfig. TopicSysFlag * {@ the link org.apache.rocketmq.com mon. Sysflag. TopicSysFlag} * / private int topicSynFlag; . Omit... } map: Data format Demo (JSON) :  { "TopicTest":[ { "brokerName":"broker-a", "perm":6, "readQueueNums":4, "topicSynFlag":0, "writeQueueNums":4 } ] }Copy the code

BrokerData attribute resolution:

/** * Broker data: The Master/Slave correspondence is defined by specifying the same BrokerName and a different BrokerId, with a BrokerId 0 for Master and a non-0 for Slave. */ public class BrokerData implements Comparable<BrokerData> {// Broker private String cluster; // brokerName private String brokerName; BrokerAddrs are a set. Brokerld =O stands for Master, Greater than O means from Slave private HashMap<Long/* brokerId */, String/* Broker Address */> brokerAddrs; Private final Random Random = new Random(); . Omit... } map: BrokerAddrTable Data format Demo (JSON) : {" broker - a ": {" brokerAddrs" : {" 0 ":" 172.16.62.75:10911 "}, "brokerName" : "broker - a", "cluster" : "DefaultCluster"}}Copy the code

BrokerLiveInfo attribute resolution:

/** * This information is not real-time. NameServer scans all brokers every 10 seconds to determine the state of the Broker based on the heartbeat packet time. Message producers are not immediately aware and may continue to send messages to them, resulting in a failure (non-highly available) */ Class BrokerLiveInfo {// Last updated time Private Long lastUpdateTimestamp; // Version number information private DataVersion DataVersion; //Netty private Channel Channel; BrokerIp2 +HA port private String haServerAddr; // brokerIp2+HA port private String haServerAddr; . Omit... } map: BrokerLiveTable Data format Demo (JSON) : {"172.16.62.75:10911":{"channel":{"active":true, "inputShutdown":false, "open":true, "outputShutdown":false, "registered":true, "writable":true }, "dataVersion":{ "counter":2, "timestamp":1630907813571 }, "HaServerAddr" : "172.16.62.75:10912", "lastUpdateTimestamp" : 1630907814074}}Copy the code

BrokerAddrTable -Map Data Format Demo (JSON)

{"DefaultCluster":["broker-a"]}
Copy the code

As you can see from the HashMap data structure maintained by RouteInfoManager and the QueueData, BrokerData, BrokerLiveInfo class attributes, the information maintained by NameServer is simple but extremely important.

3.2 Route Registration The Roker actively registers route information in the following cases:

  1. Register with all nameservers in the cluster at startup

  2. Send heartbeat packets to all Nameservers in the cluster for registration at 30 seconds

  3. Heartbeat packet registration is sent when topic messages in the broker send changes (additions/modifications/deletions).

But in fact, for NameServer, its core processing logic method is RouteInfoManager#registerBroker, source analysis is as follows:

RouteInfoManager#registerBroker public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, / / TopicConfigSerializeWrapper more complex data structure, Mainly contains all the topic on the broker information final TopicConfigSerializeWrapper topicConfigWrapper, final List < String > filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); / / / / 1 lock: maintain clusterAddrTable data Set here < String > brokerNames = this. ClusterAddrTable. Get (clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); //2: brokerAddrTable data Boolean registerFirst = false; BrokerData BrokerData = this.BrokerAddrTable. get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } / / 3: TopicQueueTable data is maintained here. The data update operation method is as follows: createAndUpdateQueueData String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null ! = topicConfigWrapper && Mixall. MASTER_ID == brokerId) {// Only primary node requests are processed, Because the topic for node information is synchronous primary node / / if the topic configuration information is changed or the broker for the first time to register the if (this. IsBrokerTopicConfigChanged (brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable ! = null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}} //4: BrokerLiveTable data maintained here. Key point: First argument to the BrokerLiveInfo constructor: System.currenttimemillis (), BrokerLiveInfo prevBrokerLiveInfo = this.BrokerLiveTable. Put (brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } //5- Maintenance: filterServerTable data if (filterServerList! = null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); If (mixall.master_id!) {// If (mixall.master_id! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr ! = null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo ! = null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }Copy the code

Remark:

The createAndUpdateQueueData method is used to maintain topicQueueTable data.

From the source can be seen: Broker registered routing information for NameServer is actually maintenance for clusterAddrTable, brokerAddrTable, topicQueueTable, brokerLiveTable, And filterServerTable. In fact, the source code is so simple.

3.3 Route Deletion

A route can be deleted in two ways: The broker actively reports the deletion, and the NameServer actively deletes the route. The processing logic for NameServer is a little different, but you can see it at first glance.

1. The unregisterBroker sends an unregistered request to NameServer if it is normally shut down.

RouteInfoManager#unregisterBroker public void unregisterBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId) { try { try { this.lock.writeLock().lockInterruptibly(); / / 1 - delete brokerLiveTable information directly, without having to determine time BrokerLiveInfo BrokerLiveInfo = this. BrokerLiveTable. Remove (brokerAddr); log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo ! = null ? "OK" : "Failed", brokerAddr ); / / 2 - delete this filterServerTable information. FilterServerTable. Remove (brokerAddr); //3- Maintain delete brokerAddrTable information Boolean removeBrokerName = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null ! = brokerData) { String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", addr ! = null ? "OK" : "Failed", brokerAddr ); if (brokerData.getBrokerAddrs().isEmpty()) { this.brokerAddrTable.remove(brokerName); log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", brokerName ); removeBrokerName = true; }} / / 4 - maintain delete clusterAddrTable information if (removeBrokerName) {Set < String > nameSet = this. ClusterAddrTable. Get (clusterName); if (nameSet ! = null) { boolean removed = nameSet.remove(brokerName); log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", removed ? "OK" : "Failed", brokerName); if (nameSet.isEmpty()) { this.clusterAddrTable.remove(clusterName); log.info("unregisterBroker, remove cluster from clusterAddrTable {}", clusterName ); }} / / 5 to maintain this. Delete topicQueueTable information removeTopicByBrokerName (brokerName); } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("unregisterBroker Exception", e); }}Copy the code

Remark:

RemoveTopicByBrokerName method: Simply remove topicQueueTable’s data, carefully you will remove it.

2. NameServer Proactive Delete :NameServer scans brokerLiveTable for 10s to detect the time difference between the last heartbeat packet and the current system time. If the timestamp is greater than 120s, the Broker message needs to be removed.

RouteInfoManager#scanNotActiveBroker public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); BROKER_CHANNEL_EXPIRED_TIME, default (1000 * 60 * 2) 120s, If ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimemillis ()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } } public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel ! = null) {try {try {//1- Query the broker information to be deleted this.lock.readlock ().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); } if (brokerAddrFound ! = null && brokerAddrFound.length() > 0) { try { try { this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); / / 2 - maintaining this. Delete brokerLiveTable information filterServerTable. Remove (brokerAddrFound); //3- Maintain delete filterServerTable message String brokerNameFound = NULL; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); / / 4 - maintain delete brokerAddrTable information while (itBrokerAddrTable. HasNext () && (null = = brokerNameFound)) {BrokerData BrokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } if (brokerNameFound ! = null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); While (it.hasNext()) {Entry<String, Set<String>> Entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } } if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); // 6- Maintenance delete: While topicQueueTable information (itTopicQueueTable hasNext ()) {Entry < String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue(); Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); }}}Copy the code

From the source can be seen: Broker unregister two ways for NameServer, BrokerAddrTable, brokerAddrTable, topicQueueTable, brokerLiveTable, And filterServerTable are removed.

3.4 Route Discovery

RocketMQ route discovery is actually non-real-time. When the Topic route changes, NameServer does not actively push the Topic route to the client. Instead, the production end and the consumer end regularly pull the latest Topic route, the core source code is as follows:

RouteInfoManager#pickupTopicRouteData public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList ! = null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); // Handle the build: BrokerData data for (String brokerName: brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null ! = brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }Copy the code

Remark:

This code is embarrassing to comment out, but it simply queries data from topicQueueTable, brokerAddrTable, filterServerTable maps, assembles it into TopicRouteData, and returns it to the client for use.

Here’s a list of TopicRouteData properties. You’ll see how simple they can be:

Public class TopicRouteData extends RemotingSerializable {// Topic order configuration "ORDER_TOPIC_CONFIG" NameSpace DefaultRequestProcessor#getRouteInfoByTopic private String orderTopicConf; Private List<QueueData> queueDatas; Private List<BrokerData> brokerDatas; Private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; . Omit... }Copy the code

In fact, NameServer provides many other functions and methods, such as:

GetBrokerClusterInfo (Get cluster information),

GetAllTopicListFromNameserver (for all topic), etc.,

But most of it revolves around

clusterAddrTable

brokerAddrTable

topicQueueTable

brokerLiveTable

filterServerTable

Here are a few hashmaps.

Four, conclusion

From the perspective of function, NameServer, as the “brain” of RocketMQ, saves routing information of cluster MQ, specifically, it records the information of maintenance Topic and Broker, monitors the running state of Broker, and provides routing capability for clients. NameServer maintains multiple HashMaps, Broker registrations, and Client queries around its Map operations. Of course, ReentrantReadWriteLock was added to address concurrency issues. In fact, this section only describes some of the key codes of NameServer, and its NameServer startup process and other source code are worth analyzing and learning.

Five, the problem of

Have you noticed that NameServer has the following defects:

Assuming that the Broker is down abnormally, NameServer waits at least 120s before removing the Broker from the routing information. During the failure of the Broker, the routing information obtained by the message Producer according to the topic contains the Broker that is down, which will cause the message to fail to be sent in a short period of time. What about this situation? Isn’t message sending not highly available? Does the consumption message on the consumer side have an impact?

Please bring the problem resolution to the sender side and the consumer side.