1. Introduction

NameServer is the brain of Rocketmq and is designed to control multiple messaging servers and solve the single point of outage problem.

2. NameServer Startup process

From starting the NameServer class org. Apache. Rocketmq. Namesrv. NameSrvStartup can see that the NameServer start first step loading first NameServerConfig configuration.

    final NamesrvConfig namesrvConfig = new NamesrvConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(9876);
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if(file ! =null) {
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);
                    MixAll.properties2Object(properties, namesrvConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
    
                    namesrvConfig.setConfigStorePath(file);
    
                    System.out.printf("load config properties file OK, %s%n", file); in.close(); }}if (commandLine.hasOption('p')) {
                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
                MixAll.printObjectProperties(console, namesrvConfig);
                MixAll.printObjectProperties(console, nettyServerConfig);
                System.exit(0);
            }
    
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
Copy the code

Step 2 create an instance of NamesrvController and start two scheduled tasks:

  1. Scan brokers every 10 seconds to remove inactive brokers.
  2. The KV configuration is printed every 10 seconds.
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run(a) {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run(a) {
                    NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
Copy the code

Third, register the hook function, start the server, and listen for the Broker.

3. Register and delete routes

RocketMQ route registration and deletion are implemented through the heartbeat function between Broker and Nameserver. After the Broker is started, it sends heartbeat packets to NameServer every 30 seconds. NameServer updates the last update time of the received Broker with lastUpdateTimestamp, and then scans this packet every 10 seconds. If no heartbeat packet is received for 120s, NameServer will remove routing information from the Broker and close the socket connection.

3.1 Route Registration

Sending heartbeat packets

    //BrokerController::start
    this.registerBrokerAll(true.false.true);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run(a) {
            try {
                BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e); }}},1000 * 10, 
    Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), // Heartbeat packets are sent every 30 seconds
    TimeUnit.MILLISECONDS);
    
    
    //BrokerOuterAPI::registerBrokerAll
    // Through the NameServer list, the Broker sends heartbeat packets to NameServer in turn
    public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {
    
        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
    				// Encapsulate the request header
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);/ / broker address
            requestHeader.setBrokerId(brokerId);/ / 0, 1 0:1: master slave
            requestHeader.setBrokerName(brokerName);/ / the name of the broker
            requestHeader.setClusterName(clusterName);// Cluster name
            requestHeader.setHaServerAddr(haServerAddr);/ / master address
            requestHeader.setCompressed(compressed);
    
    				// Message filtering
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
    				// Introduce synchronous counter here, fanally decrement, such as jam, will wait
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run(a) {
                        try {
    												/ / register
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if(result ! =null) {
                                registerBrokerResultList.add(result);
                            }
    
                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
    												/ / reductioncountDownLatch.countDown(); }}}); }try {
    						/ / wait for
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
    
        return registerBrokerResultList;
    }
Copy the code

NameServer handles heartbeat packets

Heartbeat packet processing process:

  1. Registry lock RouteInfoManager: : registerBroker
  2. Maintain BrokerData information RouteInfoManager: : registerBroker
  3. Register or update the routing information RouteInfoManager: : createAndUpdateQueueData
  4. Update BrokerLiveInfo (table) survive Broker information RouteInfoManager: : registerBroker
  5. Register a list of filter servers for brokers
    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
            RegisterBrokerResult result = new RegisterBrokerResult();
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
    
                    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                    if (null == brokerNames) {
                        brokerNames = new HashSet<String>();
                        this.clusterAddrTable.put(clusterName, brokerNames);
                    }
                    brokerNames.add(brokerName);
    
                    boolean registerFirst = false;
    
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null == brokerData) {
                        registerFirst = true;
                        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                        this.brokerAddrTable.put(brokerName, brokerData);
                    }
                    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                    //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                    //The same IP:PORT must only have one record in brokerAddrTable
                    Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> item = it.next();
                        if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
    
                    if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                            ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                            if(tcTable ! =null) {
                                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                    this.createAndUpdateQueueData(brokerName, entry.getValue());
                                }
                            }
                        }
                    }
    
                    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                            System.currentTimeMillis(),
                            topicConfigWrapper.getDataVersion(),
                            channel,
                            haServerAddr));
                    if (null == prevBrokerLiveInfo) {
                        log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                    }
    
                    if(filterServerList ! =null) {
                        if (filterServerList.isEmpty()) {
                            this.filterServerTable.remove(brokerAddr);
                        } else {
                            this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
                            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                            if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
                    this.lock.writeLock().unlock(); }}catch (Exception e) {
                log.error("registerBroker Exception", e);
            }
    
            return result;
        }
    
    
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
            QueueData queueData = new QueueData();
            queueData.setBrokerName(brokerName);
            queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
            queueData.setReadQueueNums(topicConfig.getReadQueueNums());
            queueData.setPerm(topicConfig.getPerm());
            queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
    
            List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
            if (null == queueDataList) {
                queueDataList = new LinkedList<QueueData>();
                queueDataList.add(queueData);
                this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
                log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
            } else {
                boolean addNewOne = true;
    
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    if (qd.getBrokerName().equals(brokerName)) {
                        if (qd.equals(queueData)) {
                            addNewOne = false;
                        } else {
                            log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); }}}if(addNewOne) { queueDataList.add(queueData); }}}Copy the code

3.2 Route Deletion

As mentioned above, NameServer scans the brokerLiveTable status table every 10 seconds. If a BrokerLive lastUpdateTimeStamp is more than 120s from the current time, the Broker is considered invalid and removed. Close the socket connection while updating topicQueueTable, brokerAddrTable, brokerLiveTable, And filterServerTable.

    / / RouteInfoManager: : scanNotActiveBroker scanning
    public void scanNotActiveBroker(a) {
            Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, BrokerLiveInfo> next = it.next();
                long last = next.getValue().getLastUpdateTimestamp();
                if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                    RemotingUtil.closeChannel(next.getValue().getChannel());
                    it.remove();
                    log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                    this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code

In addition to the above method, another way to trigger route deletion is to execute unregisterBroker if the Broker is normally closed.

3.3 Route Discovery

Route discovery is non-real-time. When the topic route changes, NameServer does not actively push it to the client, but the client periodically pulls the latest route of the topic. This is a fault tolerance mechanism adopted for speed.

Summary & References

summary

Look at the source code really can feel what is wonderful deep, read the source code, understand the principle, learn really will be a lot of deep. +U +U

The resources

  • Inside RocketMQ technology