preface

In the current microservices prevailing, message queues are used more and more frequently. Alibaba’s Rocketmq is the most frequently used MQ. I have been very interested in its internal principle

vein

Let’s take a holistic look at Rocketmq

  1. NameServer stores information about all brokers. In order to ensure high availability of NameServer, we can deploy multiple NameServer servers, but there is no information synchronization between multiple NameServer servers. In other words, The broker information of each NameServer may be different
  2. Brokers are divided into Master and Slave. A Master can correspond to multiple slaves, and a Slave can correspond to only one Master. Braoker server registers information with all Nameservers when it starts. A heartbeat packet is sent to NameServer every 30 seconds. The Broker itself is responsible for message management and is at the heart of Rocketmq
  3. Producer randomly selects one node in the NameServer cluster to establish a long connection and obtain routing information of topics for sending messages
  4. The Consumer randomly selects one of the nodes in the NameServer cluster to establish a long connection and obtain routing information to the Topic for message retrieval

NameServer source code parsing

Routing management

NameServer acts as a registry that holds information about all brokers. How is this information stored?

The secret is in these Hashmaps

Private final HashMap<String, List<QueueData>> topicQueueTable; BrokerName Private Final HashMap<String, BrokerData> brokerAddrTable; // Save all BrokerNames in the cluster with Key for clusterName and Value for brokerName private Final HashMap<String, Set<String>> clusterAddrTable; BrokerAddr Private Final HashMap<String, BrokerLiveInfo> brokerLiveTable; // brokerAddr for key and Value for Filter Server Private Final HashMap<String, List<String>> filterServerTable;Copy the code

A graph represents the relationship of these HashMaps

So how is this data generated in the HashMap?

As mentioned earlier, the Braoker server registers information with all Nameservers when it starts, so we should look at what the Broker does when it starts. Broker is similar to NameServer

public void start() throws Exception { ...... // Register information with NameServer periodically. This time interval is calculated out. ScheduledExecutorService. ScheduleAtFixedRate (new Runnable () {@ Override public void the run () {try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); . }Copy the code

All the way in, the key code is the following

public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); / / get all the NameServer address List < String > nameServerAddressList = this. RemotingClient. GetNameServerAddressList (); if (nameServerAddressList ! = null && nameServerAddressList. The size () > 0) {/ / construct request header final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // RegisterBrokerResult Result = registerBroker(namesrvAddr, Oneway, timeoutMills, requestHeader, Body); if (result ! = null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); }}}); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }Copy the code

So how does NameServer handle information?

The key code in the org. Apache. Rocketmq. Namesrv. Routeinfo. RouteInfoManager# registerBroker

public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); Try {try {// add the writeLock this.lock.writeLock().lockinterruptibly (); BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName = BrokerName So BrokerName won't repeat the Set < String > brokerNames = this. ClusterAddrTable. Get (clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; BrokerAddrTable BrokerData = This.BrokerAddrTable. Get (brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } / / not registered for the first time, update brokerAddrsMap Map < Long, String > brokerAddrsMap = brokerData. GetBrokerAddrs (); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); // brokerAddrTable must have only one record if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // If (null! = topicConfigWrapper && Mixall. MASTER_ID == brokerId) {// If topic information changes or if is registered for the first time (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); // Create topicQueueTable if (tcTable! = null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}} // Maintain brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.BrokerLiveTable. Put (brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } // Maintain filterServerTable if (filterServerList! = null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr ! = null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo ! = null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }Copy the code

We use Rocketmq example to debug the routing information. Topic is called TopicTest

topicQueueTable

brokerAddrTable

brokerLiveTable

clusterAddrTable

Routing to delete

Since routes are generated, they must be deleted. Under what circumstances will they be deleted? The secret lies in NameServer’s startup class, which has several important methods that we’ll take a look at in passing

The NameServer startup portal is the NamesrvStartup class

Public static NamesrvController main0(String[] args) {try {public static NamesrvController main0(String[] args) {try { CreateNamesrvController. NamesrvController = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }Copy the code
public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // Initialize task Boolean initResult = controller.initialize(); if (! initResult) { controller.shutdown(); System.exit(-3); } // Hook method, Runtime.getruntime ().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; }})); controller.start(); return controller; }Copy the code

This initialization method is important

public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); // Thread pool scheduled task, delay execution for 5s, Every 10 s perform a enclosing scheduledExecutorService. ScheduleAtFixedRate (new Runnable () {/ / scan not active broker information @ Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // Print configuration information periodically, Once every 10 minutes this. ScheduledExecutorService. ScheduleAtFixedRate (new Runnable () {@ Override public void the run () { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); . return true; }Copy the code
Public void scanNotActiveBroker() {brokerLiveTable Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); // This judgment is critical, if the last update of this brokerAddr is more than 120s, BrokerAddr IF ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimemillis ()) {brokerAddr if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimemillis ()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code

The resources

B station dark horse Rocketmq video