What is a routing center?

In common understanding, when it comes to message queues, the simplest model is shown below:

Understandably, the producer only needs to know the network address of the message queue and then send the message to that network address. However, this is often not so simple, because message queues are usually clustered, as shown in the following figure, with four instances forming a cluster:

This is also possible if the producer/consumer continues to record the network addresses of the four instances directly. With the development of services, the queue needs to be extended horizontally. To temporarily expand the queue from 4 instances to 6 instances, producers and consumers do not know the network addresses of the two new instances. In this case, they can only restart the queue and record the network addresses of the two new instances. Is that too much trouble? To address this question, consider:

  1. For clustered services, is there a better and more flexible way to record routing information?
  2. How do other services know when the number of cluster instances is increasing or decreasing? And you don’t have to do anything.

If we add a routing center system that does two things:

  1. All services can report their network addresses to the routing center.
  2. All services can obtain the network address of other services from the routing center.

So how many business systems do we have in our system? We only need to know the address of the routing center.

So how does the routing center sense a change in the number of instances? The simplest method is heartbeat. The routing center periodically sends heartbeat packets to all reporting systems. If the heartbeat packets do not respond, it indicates that the instance has stopped running.

There are names for both steps:

  • Service registration: Logging service invocation information to a common component
  • Service discovery: When a service is added or reduced, it must be automatically discovered by other callers.

There are representative products on the market: Zookeeper, Consul, ETCD.

The Namesrv RocketMQ

The figure shows the three functions of Namesrv, which are also the three core functions:

  • Registered broker
  • Obtaining Routing Information
  • Timed heartbeat detection

How does the rocketMQ source code implement these three functions

Routing meta information

Namesrv as the manager of routing information, essentially namesRV can be explained in this way: save routing information data structure + algorithm to manipulate data structure, routing meta information is the first part, so let’s first look at namesRV save routing information data structure is what?

In namesrv/SRC/main/Java/org/apache/rocketmq/namesrv routeinfo/RouteInfoManager. Java classes, contains all the routing information


    /** * Saves Topic and queue information, also called real routing information. A Topic queue may be distributed among different brokers or within the same broker. * key:topic . val:QueueData */
    private final HashMap<String, List<QueueData>> topicQueueTable;
Copy the code


    Key: Broker name. Val: Broker data */
    private final HashMap<String, BrokerData> brokerAddrTable;
Copy the code


    Val :set< Broker name> */
    private final HashMap<String, Set<String>> clusterAddrTable;
Copy the code


    Key: Broker Addr. val: Broker live info */
    private final HashMap<String, BrokerLiveInfo> brokerLiveTable;
Copy the code


    /** * for class pattern message filtering * key:broker addr. val:filter server */
    private final HashMap<String, List<String>> filterServerTable;
Copy the code

Now that you know the data structure of the routing meta-information, take a look at how the routing information is saved into these maps and how it is queried.

The service registry

Registering brokers and getting routing information are apis provided by NamesRV, Located in namesrv/SRC/main/Java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor. Java classes under the processRequest (), Determine the type of request based on the request code. The following contains all of namesRV’s external interface apis.

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        // Add the configuration
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        // Get the configuration
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        // Delete the configuration
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return queryBrokerTopicConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        / / registered broker
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            / / after 3.0.11
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            / / 3.0.11 before
            return this.registerBroker(ctx, request);
    case RequestCode.UNREGISTER_BROKER:
        / / cancellation of the broker
        return this.unregisterBroker(ctx, request);
    case RequestCode.GET_ROUTEINFO_BY_TOPIC:
        // Get routing information based on topic
        return this.getRouteInfoByTopic(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        // Get broker cluster information
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        // Delete write permissions for the Broker
        return this.wipeWritePermOfBroker(ctx, request);
        // Get all Topic information
        return getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        // Delete Topic information
        return deleteTopicInNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        // Get the kv list
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        // Get topic by cluster
        return this.getTopicsByCluster(ctx, request); .default:
Copy the code

The service registry is the corresponding code is REGISTER_BROKER, see below according to different versions of the divided into registerBrokerWithFilterServer () and registerBroker (), I use a v4.9.1, So take a look at registerBrokerWithFilterServer () implementation

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {... Omit...// How to register
	RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(...) ; . Omit... }Copy the code

The core registration logic is in registerBroker(). Move on to registerBroker()

public RegisterBrokerResult registerBroker(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {

            // step1
            // Get a collection of Broker names based on the cluster name
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            // Add broker name to the collection

            // Whether to register the first time
            boolean registerFirst = false;

            // step2
            // If there is no data in the Broker table, the broker is registered for the first time
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
                this.brokerAddrTable.put(brokerName, brokerData);
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            brokerAddrsMap.entrySet().removeIf(item -> null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()); String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);

            // step3
            // If topic information is not empty and is master
            if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {// If it is the first time that the broker topic information has changed
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                    // Get the topic info list
                    ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
                    if(tcTable ! =null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}// step4
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo(System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);

            // step5
            if(filterServerList ! =null) {
                if (filterServerList.isEmpty()) {
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList); }}// If the status of the broker is slave
            if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if(brokerLiveInfo ! =null) {
                        // In the return result, carry the address of master, as required by the master/slave synchronization step.result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
            this.lock.writeLock().unlock(); }}catch (Exception e) {
        log.error("registerBroker Exception", e);

    return result;
Copy the code

Step1 to step5 are used in code comments to mark five different stages, the role of which can be understood as follows

Routing out

Namesrv deletes routing information for a broker only once: the broker is not currently serving properly. This can also be divided into two cases

  1. The broker stopped normally.
  2. The broker is abnormal.

Broker stops normally

When the broker stops, it sends a message of type unregisterBroker to NamesRV. Send code section in the broker/SRC/main/Java/org/apache/rocketmq/broker/out/BrokerOuterAPI unregisterBrokerAll of Java classes (), Namesrv processes the entry to this message as mentioned in the service registration section. The logic of the code is straightforward. Remove the information associated with this Broker from topicQueueTable, brokerAddrTable, brokerLiveTable, and filterServerTable.

The Broker exception

When a broker is abnormal, namesRV cannot actively send logout messages to namesRV. Therefore, NamesRV needs to actively find abnormal brokers. With this in mind, there are two possible implementations:

Scenario 1: Namesrv starts a scheduled task and sends heartbeat requests to all brokers. Brokers that cannot respond can suspect an exception.

Scheme 2: Record the time when the broker sends the last heartbeat message to NamesRV. Namesrv starts a scheduled task to determine whether the last heartbeat time and the current time times out. If the timeout occurs, the Broker fails to send heartbeat packets

RocketMQ is designed in a similar way to Plan 2.

Starting a Scheduled Task


public boolean initialize(a) {
    / /... slightly
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5.10, TimeUnit.SECONDS);
	/ /... slightly
Copy the code


public void scanNotActiveBroker(a) {
    // Get broker data from the live list
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        // Determine the interval between sending messages
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
			// Remove the broker information
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code