NamerServer

NameServer is a very simple Topic routing registry that acts like ZooKeeper in Dubbo and supports dynamic registration and discovery of brokers. There are two main functions: Broker management. NameServer accepts the registration information of Broker clusters and stores it as the basic data of routing information. It then provides a heartbeat detection mechanism to check whether the Broker is still alive. Routing information management, where each NameServer holds the entire routing information about the Broker cluster and the queue information for client queries. The Producer and Conumser can then use NameServer to know the routing information of the entire Broker cluster and deliver and consume messages. NameServer is also typically deployed in clusters, where instances do not communicate with each other. The Broker registers its routing information with each NameServer, so each NameServer instance has a complete routing information stored on it. When a NameServer goes offline for some reason, the Broker can still synchronize its routing information with other Nameservers. Producers and consumers can still dynamically perceive the routing information of the Broker.

The source code parsing

NameServer Startup process

Populate startup parameters

First, create NameServerConfig (NameServer service parameters) and NettyServerConfig (NameServer network parameters).

Final NamesrvConfig NamesrvConfig = new NamesrvConfig(); Final NettyServerConfig NettyServerConfig = new NettyServerConfig(); . / / the default listener port nettyServerConfig setListenPort (9876); // Configure parameters in a fileif (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if(file ! = null) { InputStreamin = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, %s%n", file); in.close(); }} // Through command line argumentsif (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
    MixAll.printObjectProperties(console, namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
Copy the code
// rocketmqHome private String rocketmqHome = system.getProperty (mixall.rocketMQ_home_property, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); Private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // Default configuration file path, invalid. private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false; // Whether sequential messages are supported. private boolean orderMessageEnable =false;
Copy the code
Private int listenPort = listenPort; Private int serverWorkerThreads = 8; //Netty public Number of task threads In Netty network design, different thread pools are created for different service types, such as message sending, message consumption, and heartbeat detection. // If the business type does not register a thread pool, a Public thread pool will execute. private int serverCallbackExecutorThreads = 0; Private int serverSelectorThreads = 3; / / send oneway message request concurrency private int serverOnewaySemaphoreValue = 256; / / asynchronous messages concurrency private int serverAsyncSemaphoreValue = 64; / / private network connection Max idle time int serverChannelMaxIdleTimeSeconds = 120; / / the Socket send buffer size private int serverSocketSndBufSize = NettySystemConfig. SocketSndbufSize; / / Socket accept buffer size private int serverSocketRcvBufSize = NettySystemConfig. SocketRcvbufSize; / / whether the ByteBuffer open cache private Boolean serverPooledByteBufAllocatorEnable =true; Private Boolean useEpollNativeSelector = Specifies whether to enable the Epoll IO modelfalse;
Copy the code

Create and initialize an instance of NamesrvController based on the startup property.

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null"); } // Initialize controller instance Boolean initResult = controller.initialize();if(! initResult) { controller.shutdown(); System.exit(-3); } // Register the JVM hook function to close the thread pool before the JVM shuts down. Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            returnnull; }})); // Start controller controller.start();return controller;
}
Copy the code
public boolean initialize() {

    this.kvConfigManager.load();

    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // For network requests this.registerProcessor(); / / 10 seconds scanning a BrokerLiveTable, removing is not active Broker enclosing scheduledExecutorService. ScheduleAtFixedRate (newRunnable() {

        @Override
        public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); / / nameServer every 10 minutes to print a KV configuration enclosing scheduledExecutorService. ScheduleAtFixedRate (newRunnable() {
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    return true;
}
Copy the code

As you can see, NameServer starts up by loading two configurations and then starting two heartbeat detection threads to scan brokers for heartbeat detection and print KV configurations.

Routing meta information

NameServer’s main role is to provide routing information about a Topic for producers and consumers, and to manage Broker nodes. Let’s start by looking at the data structure of the routing metadata.

// Topic message queue routing information. Messages are sent according to the routing table for load balancing. private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; Private Final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; Private Final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; Private Final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // Class message filtering Private Final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;Copy the code

Routing registered

Routes are registered through the heartbeat between Broker and NameServer. The Broker starts to maintain long connections to all nameservers in the cluster and sends heartbeat packets to all Nameservers every 30 seconds. When NameServer receives a heartbeat packet it updates the BrokerLiveInfo lastUpdateTimestamp in the brokerLiveTable cache, then NameServer scans brokerLiveTable every 10 seconds, If no heartbeat packet is received for 120 seconds, NameServer removes routing information from the Broker and closes Socket connections.

Broker Heartbeat sending

/ / every 10 s sends a heartbeat packets the NameServer enclosing scheduledExecutorService. ScheduleAtFixedRate (newRunnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
        }   catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

Copy the code
/ / get nameServerAddress List < String > nameServerAddressList = this. RemotingClient. GetNameServerAddressList ();if(nameServerAddressList ! = null && nameServerAddressList.size() > 0) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); // Block the current thread with countDownLatch, Waiting for a heartbeat thread synchronization final CountDownLatch CountDownLatch = new CountDownLatch (nameServerAddressList. The size ());for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run() {try {// Send heartbeat synchronization message RegisterBrokerResult Result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if(result ! = null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); }}}); } try {countdownlatch.await (timeoutMills, timeunit.milliseconds); } catch (InterruptedException e) { } }Copy the code
/ / packaging request header RemotingCommand request. = RemotingCommand createRequestCommand (RequestCode. REGISTER_BROKER requestHeader); request.setBody(body); // Send a messageif (oneway) {
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        returnnull; } messaging RemotingCommand response = this. RemotingClient. InvokeSync (namesrvAddr, request, timeoutMills);Copy the code

Send a request of type requestcode. REGISTER_BROKER to the NameServer server.

NameServer handles the heartbeat

NameServer processing network requests in the org. Apache. Rocketmq. Namesrv. Processor. DefaultRequestProcessor# the processRequest, . If the request type for RequestCode REGISTER_BROKER, request the forwarding to org. The apache. Rocketmq. Namesrv. Routeinfo. RouteInfoManager# registerBroker.

/ / write this lock. The lock. WriteLock () lockInterruptibly (); / / to determine whether a cluster is Set < String > brokerNames = this. ClusterAddrTable. Get (clusterName);if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);
Copy the code
                boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); // If it is newif (null == brokerData) {
                    registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); // Delete obsoletewhile (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if(null ! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);Copy the code
// If it is masterif(null ! = topicConfigWrapper && Mixall. MASTER_ID == brokerId) {// If config has changed or is registered for the first timeif (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
            || registerFirst) {
        ConcurrentMap<String, TopicConfig> tcTable =
        topicConfigWrapper.getTopicConfigTable();
        if(tcTable ! = null) {for(Map.Entry<String, TopicConfig> entry : TcTable. EntrySet ()) {/ / routing update metadata enclosing createAndUpdateQueueData (brokerName, entry getValue ()); }}}}Copy the code

If the Broker is Master and BrokerTopic configuration information changes or is first registered, the Topic routing metadata needs to be created or updated, populated with TopicQueueTable, which automatically registers routing information for the default Topic. This contains routing information for mixall.default_topic. If the subject is created and BrokerConfig’s autoCreateTopicEnable is true, the routing information for mixall.default_topic is returned.

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName);
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
        queueData.setPerm(topicConfig.getPerm());
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
        } else {
            boolean addNewOne = true;

            Iterator<QueueData> it = queueDataList.iterator();
            while (it.hasNext()) {
                QueueData qd = it.next();
                if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); }}}if(addNewOne) { queueDataList.add(queueData); }}}Copy the code

Create the QueueData data structure from TopicConfig and update the topicQueueTable.

                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

Copy the code

BrokerLiveTable is an important basis for performing route deletions.

if(filterServerList ! = null) {if (filterServerList.isEmpty()) {
        this.filterServerTable.remove(brokerAddr);
    } else{ this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! = null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if(brokerLiveInfo ! = null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}Copy the code

Route Registration Summary

Route registration is the process by which a Broker sends a heartbeat to all Nameservers every 30 seconds, along with its own information. The NameServer then processes this information and updates the routing metadata. As you can see, the routing table uses HashMap data structure, and uses read/write locks with smaller granularity to control concurrency. Multiple clients are allowed to read concurrently, but only one heartbeat packet is allowed to be processed at the same time.

Routing to delete

As described above, the NameServer maintains a long connection to the Broker, the Broker state is stored in BrokerLiveTable, and the NameServer scans the NameServer every 10 seconds to discover that it has not received a heartbeat message from the Broker for 120s. Remove the Broker and close the connection to it, while updating the routing meta information. There are also cases where the Broker is normally closed and the unRegisterBroker directive is executed.

Scanning brokerLiveTable

As mentioned above, a thread is started when NameServer is started and the scanNotActiveBroker method is called every 10 seconds.

    public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); // Loop through the lastUpdateTimestamp of brokerLiveTablewhile(it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); // Delete after 120 secondsif ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); // Then close Channel this.onChannelDestroy(Next-.getKey (), next-.getValue ().getChannel()); }}}Copy the code

Maintaining routing Tables

Try {// lock this.lock.writelock ().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName =false;
                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                        BrokerData brokerData = itBrokerAddrTable.next().getValue();

                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Long, String> entry = it.next();
                            Long brokerId = entry.getKey();
                            String brokerAddr = entry.getValue();
                            if (brokerAddr.equals(brokerAddrFound)) {
                                brokerNameFound = brokerData.getBrokerName();
                                it.remove();
                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                                break; }}if (brokerData.getBrokerAddrs().isEmpty()) {
                            removeBrokerName = true;
                            itBrokerAddrTable.remove();
                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}if(brokerNameFound ! = null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {
                            Entry<String, Set<String>> entry = it.next();
                            String clusterName = entry.getKey();
                            Set<String> brokerNames = entry.getValue();
                            boolean removed = brokerNames.remove(brokerNameFound);
                            if (removed) {
                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);

                                if (brokerNames.isEmpty()) {
                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                    it.remove();
                                }

                                break; }}}if (removeBrokerName) {
                        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                        while (itTopicQueueTable.hasNext()) {
                            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                            String topic = entry.getKey();
                            List<QueueData> queueDataList = entry.getValue();

                            Iterator<QueueData> itQueueData = queueDataList.iterator();
                            while (itQueueData.hasNext()) {
                                QueueData queueData = itQueueData.next();
                                if (queueData.getBrokerName().equals(brokerNameFound)) {
                                    itQueueData.remove();
                                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}if (queueDataList.isEmpty()) {
                                itTopicQueueTable.remove();
                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
Copy the code

Summary of Route Deletion

The main thing is that the NameServer timer thread scans BrokerLiveTable every 10 seconds and if a Broker is found to be silent for 120 seconds, it removes information about that Broker and the maintenance of the routing table is done.

Routing discovery

Route discovery is not real-time. When Topic routing changes, NameServer does not actively push it to the client. The client periodically pulls the latest routing for a Topic.

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData topicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if(topicRouteData ! = null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }

        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}
Copy the code

# summary

reference

  • RocketMQ official Doc

  • Inside RocketMQ technology