The RocketMQ source code is used to start NameServer and Broker for single deployment and send and receive messages. It is a simple QuickStart. After the actual operation, you can use RocketMQ for simple business messaging. Small functions such as asynchronous consumption log collection are completed.

There are many other examples of different types of messages on the RocketMQ website. It is recommended that students who want to learn RocketMQ write these demos by hand and look at the source code after learning about the features of message types such as sequential messages, broadcast messages, timed messages, and batch messages.

Today we will talk about the startup process of NameServer and the source code analysis of the main features of NameServer.

The NameServer review

NameServer is a lightweight service discovery and routing server with two main functions:

  • Agency management,NameServerfromBrokerThe cluster accepts registration and provides a heartbeat mechanism for checkingBrokerActive or not.
  • Routing management, each name server will save aboutBrokerThe entire routing information for the cluster and the queue information for client queries.

We can think of NameServer as a lightweight registry. In fact, RocketMQ used Zookeeper as its registry, but since RocketMQ architecture did not require Zookeeper’s voting mechanism to select the master node, the Zookeeper dependency was removed and NameServer was used instead.

As the RocketMQ registry, NameServer receives registrations of all brokers in the cluster and provides a heartbeat every 10 seconds to check the Broker’s status. If the Broker has not been updated for more than 120s, it is considered invalid and removed from the cluster.

Now that you know what NameServer does, you can’t help but wonder, how does NameServer do this? This needs to read the source code.

2020052403

Nameserver Startup process analysis

As mentioned in the previous quick Start article, starting NameServer requires finding the NamesrcStartup class in the Namesrv package, and exploring the startup process of NameServer requires starting with the main method of that class.

Parsing configuration

The first step in starting NameServer is to construct an instance of NamesrvController, which is the core class of NameServer.

public static NamesrvController main0(String[] args) {
    try {
        // Construct the NamesrvController class
        NamesrvController controller = createNamesrvController(args);
        // Initialize and start the NamesrvController class
 start(controller);  String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  log.info(tip);  System.out.printf("%s%n", tip);  return controller;  } catch (Throwable e) {  e.printStackTrace();  System.exit(-1);  }  return null; } Copy the code

The createNamesrvController method, on the other hand, takes the parameters from the command line and parsed them into the configuration classes NamesrvConfig and NettyServerConfig.

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// RocketMQ default port is 9876
nettyServerConfig.setListenPort(9876);
// Specify the configuration file with the -c parameter
if (commandLine.hasOption('c')) {  String file = commandLine.getOptionValue('c');  if(file ! =null) {  InputStream in = new BufferedInputStream(new FileInputStream(file));  properties = new Properties();  properties.load(in);  MixAll.properties2Object(properties, namesrvConfig);  MixAll.properties2Object(properties, nettyServerConfig);   namesrvConfig.setConfigStorePath(file);   System.out.printf("load config properties file OK, %s%n", file);  in.close();  } } // Print the current configuration with the -p argument and exit the program if (commandLine.hasOption('p')) {  InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);  MixAll.printObjectProperties(console, namesrvConfig);  MixAll.printObjectProperties(console, nettyServerConfig);  System.exit(0); } // Specify the attribute value with "-- attribute name attribute value" MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); Copy the code

We know that RocketMQ can be run on the command line to specify parameters, as shown in the code above.

You can use the -c command to specify a configuration file, parse the contents of the configuration file into java.util.Properties classes, and assign values to NamesrvConfig and NettyServerConfig classes to parse and map the configuration file.

If the -p command is specified, the configuration information is printed on the console and the program exits directly.

In addition you can also use the -n parameter specifies namesrvAddr values, this is the org. Apache. Rocketmq. Srvutil. ServerUtil# buildCommandlineOptions method specified in the parameter, not in the scope of this section, Interested students can look at the source code debugging.

When the configuration properties are mapped, an instance of NamesrvController is constructed from the configuration classes NamesrvConfig and NettyServerConfig.

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
Copy the code

Initialization and heartbeat mechanism

The second step in starting a NameServer is through NamesrvController# Initialize.

public boolean initialize(a) {
    // load the 'KV' configuration in the 'kvconfig. json' configuration file and put these configurations into the 'KVConfigManager#configTable' property
    this.kvConfigManager.load();
    // Start a Netty server according to 'NettyServerConfig'
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
 // Initializes the thread pool responsible for processing Netty network interaction data  this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));  this.registerProcessor();   // Register the heartbeat thread pool  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  @Override  public void run(a) {  NamesrvController.this.routeInfoManager.scanNotActiveBroker();  }  }, 5.10, TimeUnit.SECONDS);   // Register to print the KV configuration thread pool  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  @Override  public void run(a) {  NamesrvController.this.kvConfigManager.printAllPeriodically();  }  }, 1.10, TimeUnit.MINUTES);   // omit the following code...   return true; } Copy the code

During the initialization of NamesrvController, a heartbeat thread pool is registered, which scans inactive brokers every 10 seconds starting 5 seconds after startup.

public void scanNotActiveBroker(a) {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
 // BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; That is, 120 seconds  if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {  RemotingUtil.closeChannel(next.getValue().getChannel());  // Remove the broker from brokerLiveTable  it.remove();  log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);  this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  }  } } Copy the code

As you can see, in the scanNotActiveBroker method, NameServer traverses the RouteInfoManager#brokerLiveTable property.

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

class BrokerLiveInfo {
    // Active timestamp of the last update of the broker
    private long lastUpdateTimestamp;
 private DataVersion dataVersion;  private Channel channel;  private String haServerAddr; } Copy the code

The RouteInfoManager#brokerLiveTable attribute stores active information for all brokers in the cluster, primarily the BrokerLiveInfo#lastUpdateTimestamp attribute, which describes the broker’s last active timestamp. If the lastUpdateTimestamp property is not updated for more than 120 seconds, the broker is considered invalid and removed from brokerLiveTable.

In addition to the heartbeat thread pool, another thread pool is registered, which prints all KV configuration information every 10 seconds.

Elegant downtime

The final step in NameServer startup is to register a JVM hook function, which is executed before the JVM shuts down. This hook function frees resources, such as shutting down the Netty server, closing the thread pool, etc.

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call(a) throws Exception {
        controller.shutdown();
        return null;
 } })); Copy the code

summary

This article describes the function of NameServer, and analyzes the startup process based on its startup class NamesrvStartup, as well as the heartbeat mechanism and the implementation principle of elegant shutdown.

I hope I can help you.

reference

  • RocketMQ Architecture
  • RocketMQ Namesrv Startup process

Copyright notice: this article is created by Planeswalker23, please bring the original link to reprint, thank you.

This article is formatted using MDNICE