This article focuses on RocketMQ route management, service registration, and service discovery mechanisms.

1. NameServer architecture design

The general design idea of message-oriented middleware is topic-based subscription and publishing mechanism. Producer sends messages of a certain topic to the message server, which is responsible for the persistent storage of the message, and consumers subscribe to the topic they are interested in. According to the subscription information (routing information), the message server pushes the message to the consumer (push mode) or the message consumer actively pulls the message to the message server (pull mode), thus realizing the decoupling between the message producer and the message consumer. In order to avoid the entire system breakdown caused by a single point of failure of the message server, multiple message servers are usually deployed to share the storage of messages. So how does a message producer know which message server to send a message to? If one of the messaging servers goes down, how can producers sense it without restarting the service?

To solve the above problems, NameServer is designed to support cluster mode, routing management, service registration, and service discovery architecture, as shown in the following figure:

The Broker message server registers with all NameserVers at startup. The message producer obtains a list of Broker server addresses from NameServer before sending a message, and then selects a message server from the list to send the message according to the load algorithm. NameServer maintains a long connection with each Broker server, detects that the Broker is alive every 10 seconds, and if it detects that the Broker is down, it removes it from the routing registry, but route changes are not immediately notified to message producers. This is designed to reduce the complexity of NameServer implementation, so a fault-tolerant mechanism needs to be provided on the message sender to ensure high availability of message delivery. High availability of NameServer itself can be achieved by deploying multiple NameServer servers that do not communicate with each other. While the data between NameServer servers may not be exactly the same at any one time, it does not have a significant impact on message delivery, other than temporarily uneven message delivery, which is one of the highlights of The RocketMQ NameServer design.

Interaction between message clients and NameServer and Broker:

  • The Broker sends heartbeat packets to each machine in the NameServer cluster every 30 seconds, containing information such as the topic routes it has created.
  • The message client updates the routing information of the corresponding topic to NameServer every 30 seconds.
  • NameServer logs a timestamp when it receives a heartbeat packet from the Broker.
  • NameServer scans brokerLiveTable every 10 seconds. If it does not receive a heartbeat packet within 120s, the Broker is deemed to have failed and the topic routing information is updated to remove the failed Broker.

2. NameServer startup process source code analysis

In the namesrv module, locate the NameServer startup class namesrvstartup. Java and pay attention to NameServer startup parameters.

  • The first step is to parse the configuration file and populate the NamesrvConfig and NettyServerConfig property values

    Method flow: main0->createNamesrvController

    Create NamesrvConfig (NameServer service parameters) and NettyServerConfig (NameServer network parameters). It then populates the NamesrvConfig and NettyServerConfig objects with the option values in the specified configuration file or startup command when parsing startup. Parameter sources are as follows:

    • -c configFile You can run the -c command to specify the path of the configuration file.

    • Use the — attribute name Attribute value command, for example –listenPort 9876.

    Part of the code is as follows:

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if(file ! =null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, %s%n", file); in.close(); }}if (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
    MixAll.printObjectProperties(console, namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
Copy the code

Default arguments in the NamesrvConfig object:

/** * RocketMQ home directory, RocketMQ home directory */ can be configured by drocketmq.home. dir=path or setting the environment variable ROCKETMQ_HOME
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
/** * NameServer stores the persistent path of the KV configuration attributes */
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
/** * NameServer Default configuration file path. * Use the -c option */ to configure the NameServer startup attribute in the configuration file
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
/** * Indicates whether sequential messages are supported. The default value is */
private boolean orderMessageEnable = false;
Copy the code

The default argument in the NettyServerConfig object is:

/** * NameServer listens on the port, which is initialized to 9876 */ by default
private int listenPort = 8888;
/** * Number of threads in the Netty service thread pool */
private int serverWorkerThreads = 8;
/** * Netty public Task thread pool * number of threads. The Netty network creates different thread pools based on service types, such as processing outgoing messages, message consumption, heartbeat detection, and so on. If the business type (RequestCode) does not * register a thread pool, the public thread pool executes */
private int serverCallbackExecutorThreads = 0;
/** * The number of threads in the I/O thread pool, mainly * NameServer, Broker side parse the request, return the corresponding number of threads. This type of thread is used to process the network request, parse the request packet, forward it to the various business thread pools to complete the specific business operation, and finally return the result to the caller
private int serverSelectorThreads = 3;
/** * send Oneway The concurrency of a message request */
private int serverOnewaySemaphoreValue = 256;
/** * Maximum concurrency to send asynchronous messages * (broker-side parameters) */
private int serverAsyncSemaphoreValue = 64;
/** * The maximum idle time of the network connection is 120s by default. If the connection idle time exceeds the value set by this parameter, the connection will be * closed */
private int serverChannelMaxIdleTimeSeconds = 120;

/** * the size of the network socket send cache. The default value is 64KB */
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
/** * network socket receive cache size, the default value is 64KB */
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
/** * ByteBuffer Specifies whether to enable caching. */ is recommended
private boolean serverPooledByteBufAllocatorEnable = true;

/** * make make install * * * .. /glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \ * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd */
/** * Specifies whether to enable the Epoll I/O model. */ is recommended in Linux
private boolean useEpollNativeSelector = false;
Copy the code

Note: Before starting NameServer, run the./mqnameserver -c configFile -p command to print the configuration properties currently loaded.

  • Create a NamesrvController instance and initialize it according to the startup property. NameServerController instance is the NameServer core controller:

Main0 ->createNamesrvController->start-> Initialize

Load the KV configuration, first create the NettyServer network processing object, and then enable two scheduled tasks, which are collectively referred to as heartbeat detection in RocketMQ.

1) Scheduled task 1: NameServer scans brokers every 10 seconds to remove inactive brokers.

2) Scheduled task 2: NameServer prints the KV configuration every 10 minutes.

public boolean initialize(a) {

    // Load the KV configuration
    this.kvConfigManager.load();
    /** * Create NettyServer network handling object */
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    /** * Timed task 1: NameServer scans brokers every 10 seconds to remove inactive brokers */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
    /** * Scheduled task 2: NameServer prints the KV configuration every 10 minutes */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);

    if(TlsSystemConfig.tlsMode ! = TlsMode.DISABLED) {// Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false; reloadServerSslContext(); }}private void reloadServerSslContext(a) { ((NettyRemotingServer) remotingServer).loadSslContext(); }}); }catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically"); }}return true;
}
Copy the code

Method flow: main0->createNamesrvController->start

Register JVM hook functions and start the server to listen for network requests from brokers and message producers.

As a common programming trick, if thread pools are used in your code, an elegant way to stop them is to register a JVM hook function that closes the thread pool before shutting down the JVM process, freeing resources in a timely manner.

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    boolean initResult = controller.initialize();
    if(! initResult) { controller.shutdown(); System.exit(-3);
    }

    /** * Registers JVM hook functions and starts the server to listen for Broker, message producer network requests */
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));

    controller.start();

    return controller;
}
Copy the code

3. NameServer route registration and fault elimination

3.1 Routing meta information

NameServer routing implementation class is org. Apache. Rocketmq. Namesrv. Routeinfo. RouteInfoManager. Before we look at route registration, let’s take a look at what information NameServer stores.

RocketMQ is subscription-based, with multiple message queues per topic, and a Broker creates four read queues and four write queues per topic by default. BrokerName consists of the same brokers in a master-slave schema. BrokerId =0 represents the master node and brokerId>0 represents the slave node. LastUpdateTimestamp in BrokerLiveInfo stores the last time a Broker heartbeat packet was received.

/** * Routing information about the topic message queue. Messages are sent according to the routing table for load balancing */
private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
/** * Broker base information, including brokerName, owning cluster name, primary and secondary Broker address */
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
/** * Broker cluster information, which stores the names of all brokers */ in the cluster
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
/** * Broker status information that NameServer replaces each time it receives a heartbeat packet
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
/** * List of FilterServers on the Broker for class * pattern message filtering */
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

3.2. Route Registration

RocketMQ route registration is implemented through the heartbeat function between Broker and NameServer. The Broker starts to send heartbeat to all nameservers in the cluster every 30 seconds. When NameServer receives the Broker heartbeat, it first updates the BrokerLiveInfo lastUpdateTimestamp in the brokerLiveTable cache and then scans the brokerLiveTable every 10 seconds. If no heartbeat is received for 120 seconds, NameServer removes routing information from the Broker and closes Socket connections.

  • The Broker sends heartbeat packets

Under the Broker module, find the BrokerStartup. Java startup class.

Method flow: main->start->BrokerController#start

The core code for sending heartbeat packets

/** ** Sends heartbeat to nameServer for registration, default 30 seconds */
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run(a) {
        try {
            BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
Copy the code

Method flow: main->start->BrokerController#start->BrokerController#start#registerBrokerAll->BrokerController#start#registerBrokerAll# doRegisterBrokerAll->BrokerOuterAPI#registerBrokerAll

This method traverses the NameServer list, and the Broker message server sends heartbeat to the NameServer in turn.

/** * iterate over all NameServer lists */
for (final String namesrvAddr : nameServerAddressList) {
    brokerOuterExecutor.execute(new Runnable() {
        @Override
        public void run(a) {
            try {
                // Register with NameServer
                RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);

                if(result ! =null) {
                    registerBrokerResultList.add(result);
                }

                log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
            } catch (Exception e) {
                log.warn("registerBroker Exception, {}", namesrvAddr, e);
            } finally{ countDownLatch.countDown(); }}}); }Copy the code

The logic for sending a heartbeat packet first encapsulates the request Header.

BrokerAddr: Broker address.

2) brokerId: brokerId=0 for primary node and brokerId>0 for secondary node.

3) brokerName: Broker name.

4) clusterName: indicates the clusterName.

5) haServerAddr: address of the primary node. This value is null on the first request and returned after the secondary node registers with NameServer.

6) requestBody:

  • TopicConfigWrapper contains topicConfigTable in The TopicConfig Manager and contains the default topics that the Broker starts with. Mixall.self_test_topic, mixall.default_topic (autocreateTopic-enable =true), mixall.self_test_topic, mixall.default_topic (autocreateTopic-enable =true), Brokerconfig. BENCHMARK_TOPIC, Mixall. OFFSET_MOVED_EVENT, BrokerConfig#brokerClusterName, BrokerConfig#brokerName. Topics in the Broker are stored in ${Rocket_Home}/store/confg/topics. Json by default.
  • FilterServerList: indicates the list of message filtering servers.
private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
    InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);

    if (oneway) {
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }

    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assertresponse ! =null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if(response.getBody() ! =null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
Copy the code
  • NameServer handles heartbeat packets

Under the namesrv module, find the org. Apache. Rocketmq. Namesrv. Processor. DefaultRequestProcessor is network processor parse request type, If the request type is requestcode.register_broker, the request is eventually forwarded to RouteInfoMan ager#registerBroker.

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return queryBrokerTopicConfig(ctx, request);
    /** * if the request type is * requestcode.register_broker, the request will eventually be forwarded to RouteInfoMan * ager#registerBroker */
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    case RequestCode.UNREGISTER_BROKER:
        return this.unregisterBroker(ctx, request);
    case RequestCode.GET_ROUTEINFO_BY_TOPIC:
        return this.getRouteInfoByTopic(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.ADD_WRITE_PERM_OF_BROKER:
        return this.addWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return deleteTopicInNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    default:
        break;
}
Copy the code

Route registration requires a write lock to prevent concurrent modification of the routing table in RouteInfoManager. First determine whether the cluster to which the Broker belongs exists. If not, create a cluster. Then add the Broker name to the cluster Broker collection.

this.lock.writeLock().lockInterruptibly();

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
Copy the code

Maintain BrokerData information by first trying to get broker information from brokerAddrTable based on the broker name. If brokerAddrTable does not exist, create a new BrokerData and place it in brokerAddrTable with registerFirst set to True. If so, the original Broker information is replaced directly. RegisterFirst is set to false to indicate a non-first registration.

boolean registerFirst = false;

BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
    Entry<Long, String> item = it.next();
    if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { log.debug("remove entry {} from brokerData", item);
        it.remove();
    }
}

String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
if (MixAll.MASTER_ID == brokerId) {
    log.info("cluster [{}] brokerName [{}] master address change from {} to {}",
            brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);
}

registerFirst = registerFirst || (null == oldAddr);
Copy the code

If the Broker is the master node and the topic configuration of the Broker changes or is registered for the first time, you need to create or update the topic routing metadata and populate the topicQueueTable, which automatically registers routing information for the default topic. This contains routing information for mixall.default_topic. When a message producer sends a topic, if the topic is not created and BrokerConfig’s autoCreateTopicEnable is true, the routing information for mixall.default_topic is returned.

if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
            || registerFirst) {
        ConcurrentMap<String, TopicConfig> tcTable =
                topicConfigWrapper.getTopicConfigTable();
        if(tcTable ! =null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}Copy the code

Create the QueueData data structure from topicConfig and update the topicQueueTable.

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

    Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataMap) {
        queueDataMap = new HashMap<>();
        queueDataMap.put(queueData.getBrokerName(), queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else {
        QueueData old = queueDataMap.put(queueData.getBrokerName(), queueData);
        if(old ! =null && !old.equals(queueData)) {
            log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), old, queueData); }}}Copy the code

BrokerLiveInfo is updated to store the healthy Broker information table, and BrokeLiveInfo is an important basis for performing the route deletion operation.

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
        new BrokerLiveInfo(
                System.currentTimeMillis(),
                topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
if (null == prevBrokerLiveInfo) {
    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
Copy the code

Register a list of FilterServer addresses for brokers that have multiple FilterServer message filtering servers associated with one Broker. If this Broker is a slave node, you need to look up the master node information of the Broker and update the corresponding masterAddr property.

if(filterServerList ! =null) {
    if (filterServerList.isEmpty()) {
        this.filterServerTable.remove(brokerAddr);
    } else {
        this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
        if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}Copy the code

Design highlights:

NameServer maintains a long connection to the Broker, whose status information is stored in BrokerLive-Table, and each time a NameServer receives a heartbeat packet, BrokerLiveTable updates the status information about the Broker and the routing Table (topicQueueTable, brokerAddrTable, brokerLiveTable, FilterServer-Table). Updating the above routing table (HashTable) uses read/write locks with less granularity to allow concurrent read operations by multiple message senders, ensuring high concurrency when sending messages. NameServer processes only one Broker heartbeat packet at a time, with multiple heartbeat packets requesting sequential execution. This is also a classic use scenario for read-write locks.

3.3 route Deletion

The Broker sends a heartbeat packet to NameServer every 30 seconds. The packet contains a BrokerId, Broker address, Broker name, and cluster name of the Broker. If the Broker goes down and NameServer is unable to receive heartbeat packets, NameServer scans the brokerLiveTable status table every 10 seconds. If the BrokerLive LastUpdate-Timestamp is more than 120s from the current time, Consider the Broker invalid, remove it, close the connection to the Broker, and update topicQueueTable, brokerAddrTable, brokerLiveTable, and filterServerTable.

RocketMQ has two trigger points to trigger the route deletion operation.

1) NameServer periodically scans brokerLiveTable for the last heartbeat packet and the previous system time timestamp. If the timestamp is greater than 120s, remove the Broker message.

2) The unregisterBroker directive is executed if the Broker is normally closed.

Because no matter how route deletions are triggered, the deletions are the same, removing information associated with the Broker from topicQueueTable, brokerAddrTable, brokerLiveTable, And filterServerTable. Therefore, RocketMQ extracts common code to maintain routing information using these two methods. This section uses the first method as an example. Method flow: RouteInfoManager#scanNotActiveBroker

ScanNotActiveBroker is executed every 10 seconds in NameServer. The logic is simple. First run through the brokerLiveInfo Routing table (HashMap) and check for the time brokerLiveInfo’s LastUpdateTimestamp last received a heartbeat packet. If it exceeds 120s, the Broker is considered unavailable. It is then removed, the connection is closed, and the routing information associated with the Broker is deleted.

public int scanNotActiveBroker(a) {
    int removeCount = 0;
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); removeCount++; }}return removeCount;
}
Copy the code

Apply for write lock. Remove Broker related information from brokerLiveTable, filterServerTable according to brokerAddress

this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
Copy the code

Maintain brokerAddrTable. Walk through HashMap brokerAddrTable, find the specific Broker from BrokerData’s HashMap brokerAddrs, and remove it from BrokerData. If no other Broker is contained in BrokerData after removal, the entry for that brokerName is removed from brokerAddrTable.

String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
        this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();

    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry<Long, String> entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                    brokerId, brokerAddr);
            break; }}if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                brokerData.getBrokerName());
    }
Copy the code

According to BrokerName, the Broker is found from the clusterAddrTable and removed from the cluster. If the cluster does not contain any brokers, remove the cluster from the clusterAddrTable.

if(brokerNameFound ! =null && removeBrokerName) {
    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, Set<String>> entry = it.next();
        String clusterName = entry.getKey();
        Set<String> brokerNames = entry.getValue();
        boolean removed = brokerNames.remove(brokerNameFound);
        if (removed) {
            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                    brokerNameFound, clusterName);

            if (brokerNames.isEmpty()) {
                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                        clusterName);
                it.remove();
            }

            break; }}}Copy the code

According to BrokerName, the queue for all topics is traversed, removed if the queue contains the queue for the current Broker, and removed from the routing table if the topic contains only the queue for the Broker to be removed.

if (removeBrokerName) {
    String finalBrokerNameFound = brokerNameFound;
    Set<String> needRemoveTopic = new HashSet<>();

    topicQueueTable.forEach((topic, queueDataMap) -> {
        QueueData old = queueDataMap.remove(finalBrokerNameFound);
        log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                topic, old);

        if (queueDataMap.size() == 0) {
            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); needRemoveTopic.add(topic); }}); needRemoveTopic.forEach(topicQueueTable::remove); }Copy the code

The lock is released and the route is deleted.

finally {
    this.lock.writeLock().unlock();
}
Copy the code

3.4. Route Discovery

RocketMQ route discovery is non-real-time. When the topic route changes, NameServer does not actively push it to the client. Instead, the client periodically pulls the latest route of the topic. The command for pulling routing information based on the topic name is encoded as GET_ROUTEINTO_BY_TOPIC. RocketMQ routing results are shown as follows:

public class TopicRouteData extends RemotingSerializable {
/** * Sequential message configuration content, from kvConfig */
private String orderTopicConf;
/** * topic queue metadata */
private List<QueueData> queueDatas;
/** * Topic distributed broker metadata */
private List<BrokerData> brokerDatas;
/** * The address list of the filter server on the Broker */
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    }
Copy the code

The NameServer route discovery implementation class is DefaultRequestProcessor#getRouteInfoByTopic.

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if(topicRouteData ! =null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic());  topicRouteData.setOrderTopicConf(orderTopicConf); }byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}
Copy the code
  • Call the RouterInfoManager method, The List, List, and filterServer address tables of TopicRouteData are populated from the routing tables topicQueueTable, brokerAddrTable, and filterServerTable, respectively.

  • If the routing information corresponding to the topic is found and the topic is a sequential message, the configuration related to the sequential message is obtained from NameServer KVConfig to populate the routing information. If the routing Code cannot be found, TOPIC_NOT_EXISTS is used to indicate that no route is found.