RocketMQ routing center NameServer

Recently, I have been researching message middleware and I have read a lot of information. I suggest that you should take a good look at it. I recommend RocketMQ launched by Ali. Why is it recommended here? The main reason is that message-oriented middleware is ubiquitous no matter what your project or business is. Decoupling, asynchro, peak clipping flow limitation, and a whole host of other benefits. And most importantly, it is a top domestic open source project organized by Apache, the source code is written in Java, not to mention Rocket’s advantages and disadvantages compared to other middleware. I’ll keep Rocketmq updated as a column in the future. Of course, the premise is that I touch the fish time, recently too busy, touch the fish time are less, good nonsense not much to say, open!


The article directories


preface

RocketMq three roles 1. Nameserver is called the routing center. Zk was also used in previous RocketMQ releases, but was abandoned for architectural reasons. Topic is a virtual concept, which is different from the concept of Topic in AMQ. ☺ producer — I won’t dwell on the same argument

1. NameServer architecture design

The design idea of message-oriented middleware is generally based on the subscription and publishing mechanism of a topic. Producer sends messages of a topic to a Broker, which is responsible for the persistent storage of the message, and consumers subscribe to the topic they are interested in. The message server pushes the message to the consumer based on the subscription information (routing information) (Push mode, actually Rocketmq pull mode, Push mode is ultimately pull mode, using long polling mechanism, what is long polling and there will be time to distinguish it from long connection, Short connection) or message consumers actively Pull messages from the message server (the Pull pattern) to decouple producer consumers. From the above explanation, it is easy to understand the entire message sending process, but the above structure is fragile, in today’s high concurrency situation, single point of failure handling measures are essential. Therefore, a number of message servers are derived to assume the storage of messages to ensure the high availability of services, zero loss of messages and other message-oriented middleware situations. But once you have so many servers, how does the state of the servers, how does the information communicate with each other? And how does he make the producers, the consumers, feel that? If one of the messaging servers goes down, how do producers and consumers know? NameServer is designed to solve these problems.

The logical deployment diagram for RocketMQ is shown in the figure above. After the Broker message server is started, all Nameservers are registered. Before sending messages, the message Producer obtains a list of server addresses from NameServer, and then selects a message server from the list according to the load algorithm. NameServer maintains a long connection to each Broker server, detects that the Broker is alive every 10 seconds, and if it detects that the Broker is down, it is culled from the routing registry. But route changes don’t immediately notify message producers, right? Why? The main thing is that it reduces complexity, ensures high availability, that is, AP (this is why zK is not used, because ZK does a pretty good job of consistency), and namServers are stateless and don’t talk to each other. Therefore, broker clusters need to register their node information with NameServer cluster in turn. Since NameServer does not maintain broker and other related information, to avoid thread safety problems, internal read and write lock is introduced to ensure that the current situation of more read and less write, serialization of write requests. Of course there are message inconsistencies, but RocketMQ guarantees that the message will be consumed at least once, so if the current message routing information is not found, retry will be triggered automatically!

NameServer startup process

1. View the startup process of NameServer from the perspective of source code. First, enter NameSrvStartup



Can be seen from the above code is roughly NameServer start about NameServerConfig, NettyServer – Config related parameters through the configuration file, or start filling command line parameters, and then initializes NameServerController

2. The core code initialize for the key nameServer initialization instance

After loading the relevant configuration, create the NettyServer object, and then enable the two scheduled tasks, namely heartbeat detection. Scheduled task 1: NameServer scans the Broker every 10s to determine whether the time since the last heartbeat release has exceeded the current time. If yes, NameServer deletes scheduled task 2: NameServer prints configuration information every 10 minutes

public boolean initialize() {

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
Copy the code

3. Route registration and fault elimination of NameServer

The main function of NameServer is to provide routing information about topics for message producers and message consumers. Therefore, NameServer needs to store basic routing information and nodes that can manage brokers, including route registration and route deletion

1. Route meta information RouteInfoManager

BrokerAddrTable clusterAddrTable brokerLiveTable filterServerTable BrokerQueuetable brokerAddrTable clusterAddrTable Public class RouteInfoManager {private static Final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); Private final HashMap<String/* Topic */, List<QueueData>> topicQueueTable; BrokerName, cluster name, and primary/secondary Broker address private Final HashMap<String/* BrokerName */, BrokerData> brokerAddrTable; //Broker cluster information, Private Final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //Broker status information that NameServer replaces each time it receives a heartbeat packet private Final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // List of FilterServers on Broker, For message filtering Private Final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); }Copy the code

Related concepts: RocketMQ is subscription-based and can have multiple message queues for a Topic, with a Broker default creating four read queues and four write queues for each Topic. Brokers with the same BrokerName form the M-S schema. BrokerId 0 is the master, and brokerId greater than 0 is the slave.

Data memory structure

topicQueueTable: {
	"topic": {{"brokerName": "broker-a"."readQueueNums": "4"."readQueueNums": "4"."perm": 6."topicSynFlag": 0
		},
		{
			"brokerName": "broker-b"."readQueueNums": "4"."readQueueNums": "4"."perm": 6.// Read and write permissions
			"topicSynFlag": 0
		}
	}
}

brokerAddrTable: {
	"broker-a": {{"cluster": "c1"."brokerName": "broker-a"."brokerAddrs": {
				0: ip,
				1: ip
			}

		}
	},
	"broker-b": {{"cluster": "c2"."brokerName": "broker-b"."brokerAddrs": {
				0: ip,
				1: ip
			}

		}
	}
}

brokerLiveTable: {
	"192.168.1.2 instead": {{"lastUpdateTimestamp": "151827031980"."dataVersion": "version-a"."channel": obj,
			"haServerAddr": ip

		}
	},
	"192.168.1.2 instead": {{"lastUpdateTimestamp": "151827031980"."dataVersion": "version-a"."channel": obj,
			"haServerAddr": ip

		}
	},
	"192.168.1.2 instead": {{"lastUpdateTimestamp": "151827031980"."dataVersion": "version-a"."channel": obj,
			"haServerAddr": ip

		}
	}
}
Copy the code

Routing registered

RocketMQ route registration is implemented through the heartbeat function between Broker and NameServer. The Broker sends heartbeat statements to all nameservers in the cluster on startup and sends heartbeat packets to NameServer every 30s. NameServer updates lastUpdateTimestamp in BrokerLiveTable on receiving Broker heartbeat packets. NameServer then scans brokerLiveTable every 10s and, if no heartbeat packet has been received for 120s, strips the broker of routing information and closes the socket connection.

The core code



Send heartbeat packets and register broker data

NameServer handles the heartbeat packet logic code

Route registration requires a write lock to prevent concurrent modification of the routing table in RouteInfoManager. First determine if the cluster to which the Broker belongs exists, if not, create it, and then add the Broker to the collection

Routing to delete

Based on the above introduction, I believe you already know what route deletion is, so I will briefly introduce it without listing the code. The Broker sends a heartbeat packet to NameServer every 30 seconds. The heartbeat packet contains a BrokerId,Broker address,Broker name, cluster name, and a list of filterServers associated with the Broker. But if the Broker is down and NameServer is unable to receive heartbeat packets, NameServer then culls the failed Broker.NameServer scans the BrokerLiveTable status table every 10 seconds. If BrokerLivede LastUpdateTimeStamp is more than 120 to the current time, the Broker is considered invalid and removed, And update the TopicQueueTbale, brokerAddrTable brokerLiveTable, filterServerTable the associated data in the collection.

Routing discovery

RocketMQ route discovery is non-real-time. When the Topic route changes, NameServer does not actively push it to the client. Instead, the client periodically pulls the latest route of the Topic.

conclusion

NameServer routing function is mainly introduced, routing registration, routing discovery, delete, it is already very late, have to sleep, or tomorrow will be late, the wrong point mentioned in the article, or there is no doubt to add to the welcome message, unfinished to be continued QAQ

Write in the last

Welcome to follow my wechat public account [Village of apes] to talk about Java interview and my wechat for further communication and learning, wechat mobile search [codeyuanzhicunup] to add if there are related technical questions, welcome to leave a message to discuss, the public account is mainly used for technology sharing, Including often meet test analysis, as well as source code interpretation, micro service framework, technology hot spots.