In this article we will focus on how NameServer stores information about topic, broker, and cluster.

Rocketmq-namesrv module basic introduction

As you can see in its source code, Namesrv only has these classes (there are other modules that use rocketMQ source code, of course). The NamesrvController initializes the netty connection, then the DefaultRequestProcessor dispatches the client’s operations, and the RouteInfoManager saves and obtains the registration information.

Ii. Introduction of important classes

1, DefaultRequestProcessor

This class is the distribution of specific operations to clients, such as broker registration and producer obtaining information about brokers

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...switch (request.getCode()) {
        ..........
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINFO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}
Copy the code

The requestcode.get_topicS_by_cluster client gets namesRV saved topic information, the client gets namesRV saved topic information, and the registered brok Er message GET_BROKER_CLUSTER_INFO and GET_ROUTEINFO_BY_TOPIC Obtain Topic routing information.

1), registerBroker

This method handles broker registration requests

public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {... RegisterBrokerResult result =this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        topicConfigWrapper,
        null,
        ctx.channel()
    );

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
Copy the code

This is where RouteInfoManager gets the request.

2), getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if(topicRouteData ! =null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic());  topicRouteData.setOrderTopicConf(orderTopicConf); }byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}
Copy the code

Access to TopicRouteData routing information is also given to RouteInfoManager. Let’s look at RouteInfoManager in detail.

2, RouteInfoManager

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

In this class, we need to know the values of these member variables, because information about broker registration and Topic routing is retrieved from these variables.

1) Pre-introduction

While learning about these variables, I had a three-machine cluster relationship and set up two broker clusters: DefaultCluster and DefaultCluster-2.

DefaultCluster consists of two primary and secondary defaultClusters, which constitute part of the configuration of DefaultCluster and broker-xx.properties

192.168.127.128: broker – a

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
Copy the code

192.168.127.128: broker – b – s

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
Copy the code

192.168.127.129: broker – a – s

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
Copy the code

192.168.127.129: broker – b

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
Copy the code

In addition, when I send messages, the messages will be sent to both broker clusters. To achieve cluster message isolation, you should register the different clusters with different nameServer.

192.168.127.129: broker – c

brokerClusterName=DefaultCluster-2
brokerName=broker-c
brokerId=0
Copy the code

192.168.127.129: broker – a – s

brokerClusterName=DefaultCluster-2
brokerName=broker-c-s
brokerId=1
Copy the code

Let’s take a look at the specific information it saves

TopicQueueTable (HashMap<String, List)<QueueData>>)

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSysFlag;
Copy the code

TopicQueueTable stores the mapping between topics and brokers, such as Cluster_queue_topic, which has four queues per broker in our cluster by default. The sent messages are then distributed across the nodes of three brokers — broker-A, broker-B, and broker-C, meaning that a topic has 3*4 MessageQueue. For example, when the producer sends a message, MessageQueue will choose one of 12:

Routing information for this topic:

We can use topicQueueTable to know which brokers the topic is published to and the distribution of queues for the entire topic.

BrokerAddrTable (HashMap<String, BrokerData>)

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
Copy the code

This is the distribution of primary and secondary instances that hold instances of brokerName for which the broker is started:

For example broker- B’s cluster is DefaultCluster and it has two instances where the primary node (breokerId=0) is at 192.168.127.129 and the secondary node (brokerId>0) is at 192.168.127.128.

4), clusterAddrTable(HashMap<String, Set<String>>)

This is to save which Brokernames the cluster has.

BrokerLiveTable (HashMap<String, BrokerLiveInfo>)

This is basically used to maintain the last heartbeat

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
Copy the code
class BrokerLiveInfo {
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;
Copy the code

6), registerBroker (REGISTER_BROKER)

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData); }... String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);

            if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if(tcTable ! =null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                newBrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); . }finally {
            this.lock.writeLock().unlock(); }}catch (Exception e) {
        log.error("registerBroker Exception", e);
    }
    return result;
}
Copy the code
private final ReadWriteLock lock = new ReentrantReadWriteLock();
Copy the code
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

    List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataList) {
        queueDataList = new LinkedList<QueueData>();
        queueDataList.add(queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else{... }}Copy the code

The update is done first by locking and then by storing the information builds registered by the client instance Broker in the above variables.

7), pickupTopicRouteData (GET_ROUTEINFO_BY_TOPIC)

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);
    try {
        try {
            this.lock.readLock().lockInterruptibly();
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if(queueDataList ! =null) {
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }

                for (String brokerName : brokerNameSet) {
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null! = brokerData) { BrokerData brokerDataClone =newBrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); . }}}...if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }

    return null;
}
Copy the code
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

This is to get the queue distribution for which brokerDatas the topic corresponds to, and which brokerDatas the topic corresponds to.