2.2 NameServer

2.2.1 Architecture design

The design idea of message-oriented middleware is generally based on the topic subscription and publishing mechanism. Producer sends a topic to the message server, which is responsible for the persistent storage of messages, and consumers subscribe to the topic of interest. The message server decouples message producers from message consumers by pushing messages to consumers (Push pattern) or consumers actively pulling messages to the message server (Pull pattern) based on subscription information (routing information). In order to avoid a single point of failure of the message server, multiple message servers are usually deployed to share the storage of messages. How does the message producer know which message server to send the message to? If one of the message servers is down, how can message producers sense it without restarting the service?

NameServer is designed to solve these problems.

The Broker message server registers with all NameserVers when it starts. Producer gets a list of Broker server addresses from NameServer before sending a message, and then selects a server from the list to send the message according to the load balancing algorithm.

NameServer keeps a long connection to each Broker, checks for the Broker to be alive at 30 seconds interval, and removes it from the routing registry if it detects that the Broker is down. However, route changes do not immediately notify message producers. The purpose of this design is to reduce the complexity of NameServer implementation and provide a fault tolerant mechanism on the message sender to ensure the availability of message sending.

The high availability of NameServer itself is achieved by deploying multiple NameServer servers, but they do not communicate with each other. That is, the data between NameServer servers is not exactly the same at a certain moment, but this does not have any impact on message sending, which is also a highlight of NameServer design. In short, RocketMQ is designed to be simple and efficient.

2.2.2 Startup process

Start the class: org. Apache. Rocketmq. Namesrv. NamesrvStartup

Step one

Parse the configuration file, populate the NameServerConfig, NettyServerConfig property values, and create the NamesrvController

Code: NamesrvController# createNamesrvController

/ / create NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
/ / create NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Set the boot port number
nettyServerConfig.setListenPort(9876);
// Parse the startup -c argument
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if(file ! =null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// Parse the startup -p argument
if (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
    MixAll.printObjectProperties(console, namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    System.exit(0);
}
/ / will launch parameters filling to namesrvConfig, nettyServerConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

/ / create NameServerController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
Copy the code

NamesrvConfig properties

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
Copy the code

RocketmqHome: RocketMQ home directory

KvConfig: NameServer stores the persistent path of KV configuration attributes

ConfigStorePath: nameServer Default configuration file path

OrderMessageEnable: Whether sequential messages are supported

NettyServerConfig properties

private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
private boolean useEpollNativeSelector = false;
Copy the code

ListenPort: NameServer Listening port, which is initialized to 9876 by default

ServerWorkerThreads: Specifies the number of threads in the Netty service thread pool

ServerCallbackExecutorThreads: Netty public task thread pool threads number of Netty network design, can create a thread pool of different depending on the type of business, such as processing messages, message consumption, heartbeat detection, etc. If the business type does not register a thread pool, it is executed by a public thread pool.

ServerSelectorThreads: The number of IO threads pools. NameServer and Broker end parses the request and returns the corresponding number of threads. These threads are used to process network requests, parse the request packet, forward it to the business thread pools to complete the specific operation, and then return the result to the caller.

ServerOnewaySemaphoreValue: send oneway message request concurrent read (Broker end parameters);

ServerAsyncSemaphoreValue: asynchronous messages maximum concurrent degree;

ServerChannelMaxIdleTimeSeconds: network connection’s largest leisure time, the default of 120 s.

ServerSocketSndBufSize: Indicates the size of the network socket send buffer.

ServerSocketRcvBufSize: Indicates the cache size of the network receiver.

Whether serverPooledByteBufAllocatorEnable: ByteBuffer open cache;

UseEpollNativeSelector: Specifies whether to enable the Epoll IO model.

Step 2

Create and initialize an instance of NamesrvController based on the startup property. NameServerController instance is the NameServer core controller. Does it look like Kafka!! They all have the idea of a controller.

Code: NamesrvController# initialize

public boolean initialize(a) {
	// Load the KV configuration
    this.kvConfigManager.load();
	// Create a NettyServer network handling object
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
	// Enable scheduled tasks: scan brokers every 10 seconds to remove inactive brokers
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run(a) {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
	// Enable the scheduled task: Prints the KV configuration every 10 minutes
	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run(a) {
            NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
    return true;
}
Copy the code

Step 3

Close the thread pool before shutting down the JVM process to free up resources in time

Code: NamesrvStartup# start

// Register the JVM hook function code
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call(a) throws Exception {
        // Release resources
        controller.shutdown();
        return null; }}));Copy the code

2.2.3 Route Management

NameServer’s main role is to provide message producers and message consumers with routing information about topics. Therefore, NameServer needs to store basic routing information and manage Broker nodes, including route registration and route deletion.

2.2.3.1 Route meta information

Code: RouteInfoManager

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

TopicQueueTable: Topic Indicates the routing information of the message queue. The load is balanced when messages are sent according to the routing table

BrokerAddrTable: Broker base information, including brokerName, cluster name, and primary and secondary Broker addresses

ClusterAddrTable: Broker cluster information that stores the names of all brokers in the cluster

BrokerLiveTable: Broker status message that NameServer replaces each time it receives a heartbeat packet

FilterServerTable: List of FilterServers on the Broker for class-pattern message filtering.

2.2.3.2 Route Registration

1) Send heartbeat packets

RocketMQ route registration is implemented through the heartbeat function between Broker and NameServer. The Broker sends heartbeat messages to all nameservers in the cluster every 30 seconds at startup. When NameServer receives a heartbeat packet it updates the BrokerLiveInfo lastUpdataTimeStamp information in the brokerLiveTable cache, then NameServer scans brokerLiveTable every 10 seconds, If no heartbeat packet is received for 120 seconds, NameServer removes the Broker’s routing information and closes the Socket connection.

Code: BrokerController# start

// Register Broker information
this.registerBrokerAll(true.false.true);
// Report Broker information to NameServer every 30 seconds
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run(a) {
        try {
            BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
                                                  TimeUnit.MILLISECONDS);
Copy the code

Code: BrokerOuterAPI# registerBrokerAll

// Get nameServer address information
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
// Iterate over all nameserver lists
if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {

    // Encapsulate the request header
    final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    requestHeader.setHaServerAddr(haServerAddr);
    requestHeader.setCompressed(compressed);
	// Wrap the request body
    RegisterBrokerBody requestBody = new RegisterBrokerBody();
    requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
    requestBody.setFilterServerList(filterServerList);
    final byte[] body = requestBody.encode(compressed);
    final int bodyCrc32 = UtilAll.crc32(body);
    requestHeader.setBodyCrc32(bodyCrc32);
    final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run(a) {
                try {
                    // Register with NameServer respectively
                    RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    if(result ! =null) {
                        registerBrokerResultList.add(result);
                    }

                    log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
                } finally{ countDownLatch.countDown(); }}}); }try {
        countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    }
}
Copy the code

Code: BrokerOutAPI# registerBroker

if (oneway) {
    try {
        this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
    } catch (RemotingTooMuchRequestException e) {
        // Ignore
    }
    return null;
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
Copy the code
2) Processing heartbeat packets

. Org. Apache. Rocketmq namesrv. Processor. DefaultRequestProcessor network processing class parse request type, if the request type for REGISTER_BROKER, The request is forwarded to RouteInfoManager#regiesterBroker

Code: DefaultRequestProcessor# the processRequest

// Decide to register Broker information
case RequestCode.REGISTER_BROKER:
	Version brokerVersion = MQVersion.value2Version(request.getVersion());
	if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
	    return this.registerBrokerWithFilterServer(ctx, request);
	} else {
            // Register Broker information
	    return this.registerBroker(ctx, request);
	}
Copy the code

Code: DefaultRequestProcessor# registerBroker

RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
    requestHeader.getClusterName(),
    requestHeader.getBrokerAddr(),
    requestHeader.getBrokerName(),
    requestHeader.getBrokerId(),
    requestHeader.getHaServerAddr(),
    topicConfigWrapper,
    null,
    ctx.channel()
);
Copy the code

RouteInfoManager#registerBroker maintains routing information

/ / lock
this.lock.writeLock().lockInterruptibly();
/ / maintenance clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
Copy the code
/ / maintenance brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// brokerData is created for the first time
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
}
// Update the Broker without first registering
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
    Entry<Long, String> item = it.next();
    if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
Copy the code
/ / maintenance topicQueueTable
if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || 
        registerFirst) {
        ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
        if(tcTable ! =null) {
            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}Copy the code

Code: RouteInfoManager# createAndUpdateQueueData

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    / / create the QueueData
	QueueData queueData = new QueueData();
	queueData.setBrokerName(brokerName);
	queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
	queueData.setReadQueueNums(topicConfig.getReadQueueNums());
	queueData.setPerm(topicConfig.getPerm());
	queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
	// Get the set of queues in topicQueueTable
	List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    // If topicQueueTable is empty, queueData is added directly to the queue collection
	if (null == queueDataList) {
	    queueDataList = new LinkedList<QueueData>();
	    queueDataList.add(queueData);
	    this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
	    log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
	} else {
        // Check if it is a new queue
	    boolean addNewOne = true;
	    Iterator<QueueData> it = queueDataList.iterator();
	    while (it.hasNext()) {
	        QueueData qd = it.next();
            // If brokerNames are the same, the representatives are not new queues
	        if (qd.getBrokerName().equals(brokerName)) {
	            if (qd.equals(queueData)) {
	                addNewOne = false;
	        } else {
	                    log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); }}}// If it is a new queue, add the queue to queueDataList
        if(addNewOne) { queueDataList.add(queueData); }}}Copy the code
/ / maintenance brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
    System.currentTimeMillis(),
    topicConfigWrapper.getDataVersion(),
    channel,
    haServerAddr));
Copy the code
/ / maintenance filterServerList
if(filterServerList ! =null) {
    if (filterServerList.isEmpty()) {
        this.filterServerTable.remove(brokerAddr);
    } else {
        this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
        if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}Copy the code

2.2.3.3 Deleting routes

The Broker sends a heartbeat packet to NameServer every 30 seconds. The heartbeat packet contains a BrokerId, Broker address, Broker name, cluster name, and a list of FilterServers associated with the Broker. But if brokers go down and NameServer cannot receive heartbeat packets, how does NameServer weed out failed brokers? NameServer scans the brokerLiveTable status table every 10 seconds. If a BrokerLive lastUpdateTimestamp is more than 120s from the current time, the Broker is considered invalid, removed, and connected to. Update topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable at the same time.

RocketMQ has two triggers to delete routing information:

  • NameServer periodically scans brokerLiveTable for the time difference between the last heartbeat packet and the current system and removes the broker if the time exceeds 120s.
  • The unregisterBroker directive is executed when the Broker is normally closed

The route deletion method in both methods is the same, that is, the information related to the broker is deleted from the related routing table.

Code: NamesrvController# initialize

// Scan for active brokers every 10s
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run(a) {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code

Code: RouteInfoManager# scanNotActiveBroker

public void scanNotActiveBroker(a) {
    / / get brokerLiveTable
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    / / traverse brokerLiveTable
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        // If the time after receiving the heartbeat packet is more than 120s
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            // Close the connection
            RemotingUtil.closeChannel(next.getValue().getChannel());
            / / remove the broker
            it.remove();
            // Maintain the routing table
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code

Code: RouteInfoManager# onChannelDestroy

// Request write locks removed from brokerLiveTable and filterServerTable according to brokerAddress
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
Copy the code
/ / maintenance brokerAddrTable
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
/ / traverse brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();
    // Traverses the broker address
    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry<Long, String> entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        // Remove brokerAddr according to broker address
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                brokerId, brokerAddr);
            break; }}If the current topic contains only brokers to be removed, remove that topic
    if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}Copy the code
/ / maintenance clusterAddrTable
if(brokerNameFound ! =null && removeBrokerName) {
    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
    / / traverse clusterAddrTable
    while (it.hasNext()) {
        Entry<String, Set<String>> entry = it.next();
        // Get the cluster name
        String clusterName = entry.getKey();
        // Get the brokerName collection in the cluster
        Set<String> brokerNames = entry.getValue();
        // Remove brokerNameFound from brokerNames
        boolean removed = brokerNames.remove(brokerNameFound);
        if (removed) {
            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                brokerNameFound, clusterName);

            if (brokerNames.isEmpty()) {
                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                    clusterName);
                // If the cluster does not contain any brokers, remove the cluster
                it.remove();
            }

            break; }}}Copy the code
// Maintain the topicQueueTable queue
if (removeBrokerName) {
    / / traverse topicQueueTable
    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
        this.topicQueueTable.entrySet().iterator();
    while (itTopicQueueTable.hasNext()) {
        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
        // The topic name
        String topic = entry.getKey();
        // Set of queues
        List<QueueData> queueDataList = entry.getValue();
		// Walk through the topic queue
        Iterator<QueueData> itQueueData = queueDataList.iterator();
        while (itQueueData.hasNext()) {
            // Remove the active broker message from the queue
            QueueData queueData = itQueueData.next();
            if (queueData.getBrokerName().equals(brokerNameFound)) {
                itQueueData.remove();
                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}// If the topic queue is empty, remove the topic
        if (queueDataList.isEmpty()) {
            itTopicQueueTable.remove();
            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); }}}Copy the code
// Release the write lock
finally {
    this.lock.writeLock().unlock();
}
Copy the code

2.2.3.4 Route Discovery

RocketMQ route discovery is non-real-time. When the Topic route changes, NameServer does not actively push it to the client. Instead, the client periodically pulls the latest route of the Topic.

Code: DefaultRequestProcessor# getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
	// Call the RouteInfoManager method to fill TopicRouteData's List
      
       , List
       

, filterServer
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); // If the routing information corresponding to the topic is found and the topic is a sequential message, fill the routing information with the configuration related to the sequential message from NameServer KVConfig if(topicRouteData ! =null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } Copy the code

2.2.4 summary