For detailed analysis, please refer to the Code address = in the NameServer parsing section of Gitee project

What is the NameServer

A simple Topic routing registry that supports Broker service registration and discovery. Similar to Dubbo’s ZooKeeper

The main power

  1. Broker cluster management: Manages Broker cluster registration and heartbeat to detect Broker survival
  2. Routing information management: Saves routing information of Broker clusters, and producers and consumers use Nameserver to obtain routing information for delivery and consumption

NameServer Startup process

steps

  1. Create a controller
    1. Parses command and configuration file parameters
    2. Create NamesrvController
  2. Start the
    1. Initialization: Handles Netty related, creating remote services and worker threads. Enable scheduled tasks to remove inactive brokers
    2. Start: start Starts nettyServer

The startup entry is NamesrvStartup#main. The code is as follows:

public class NamesrvStartup {
   public static NamesrvController main0(String[] args) {
      / /... Other ellipsis logic
      // Step 1: Create controller
      NamesrvController controller = createNamesrvController(args);
      // Step 2: Start
      start(controller);
      / /... Other ellipsis logic
   }
   / /... Other ellipsis logic
}
Copy the code

Step 1: Create the Controller

The steps are as follows:

  1. Parses command and configuration file parameters
    1. The command parameters include -p, -n, -h, and -c
  2. Create NamesrvController
public class NamesrvStartup {
   public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
      // Set the version information
      System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

      // Parse the command line code
      // Maintain an options list that will parse the parameters (-h terminates execution and returns a description of each parameter)
      // eg: -c D:\code\ openSource \ Rocketmq \conf\xxxx.conf
      Options options = ServerUtil.buildCommandlineOptions(new Options());
      commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
      if (null == commandLine) {// -h commandLine == null terminates the process
         System.exit(-1);
         return null;
      }

      // nameServer configuration
      final NamesrvConfig namesrvConfig = new NamesrvConfig();
      // nettyServer configuration
      final NettyServerConfig nettyServerConfig = new NettyServerConfig();
      nettyServerConfig.setListenPort(9876);// The nettyServer port is fixed to 9876

      // Parse configuration command option 'c', parse file, parse parameter information into properties
      if (commandLine.hasOption('c')) {
         String file = commandLine.getOptionValue('c');
         if(file ! =null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// Process the -p parameter, which is used to print the namesrvConfig and nettyServerConfig configurations (exit immediately after printing)
      if (commandLine.hasOption('p')) {
         InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
         MixAll.printObjectProperties(console, namesrvConfig);
         MixAll.printObjectProperties(console, nettyServerConfig);
         System.exit(0);
      }

      // Set all commandLine configuration to namesrvConfig (assign properties to namesrvConfig)
      / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1. Get all setXxx(...) from object Method * 2. Get setXxx(...) 4. Obtain the corresponding value of the Xxx property from properties, and use setXxx(...). 5. Reflection calls setXxx(...) Methods for assignment of * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
      MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

      // If ROCKETMQ_HOME is not set, an error will be reported here
      if (null == namesrvConfig.getRocketmqHome()) {
         System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
         System.exit(-2);
      }

      // Load log configuration........... omit
      
      // Pass namesrvConfig, nettyServerConfig to create a Controller instance for later startup
      final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

      // Remember all configs to prevent discard
      controller.getConfiguration().registerConfig(properties);

      return controller;
   }
   / /... Other ellipsis logic
}
Copy the code

Step 2: Boot up

The steps are as follows:

  1. Initialization: Handles Netty related, creating remote services and worker threads. Enable scheduled tasks to remove inactive brokers
  2. Start: start Starts nettyServer
public class NamesrvStartup {
   public static NamesrvController start(final NamesrvController controller) throws Exception {
       // omit other...
       /** Init, do two things * 1. Handle netty related: create remote service and worker thread * 2. Enable scheduled tasks: Remove inactive brokers *****************/
      boolean initResult = controller.initialize();
      if(! initResult) { controller.shutdown(); System.exit(-3);
      }
      // Add a close hook
      Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
         controller.shutdown();
         return null;
      }));
      // Where the boot is actually performed
      controller.start();

      returncontroller; }}Copy the code

The controller initialization

Main things to do: deal with Netty, create remote services and worker threads. Start scheduled tasks and remove inactive brokers. The code is analyzed as follows

public class NamesrvStartup {
   public boolean initialize(a) {
      // Load the KV configuration
      this.kvConfigManager.load();
      // Create a Netty remote service
      / * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1. ServerBootstrap: This is the netty server startup class *. PublicExecutor: Create a thread pool called publicExecutor * 3. If you are familiar with netty, you will be familiar with these two threads: * -- This is the thread that netty uses to handle connection events and read/write events. * -- eventLoopGroupBoss refers to the thread group of Netty boss. EventLoopGroupSelector corresponds to the worker thread group * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
      this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
      // Netty remote server thread
      this.remotingExecutor =
              Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
      // Register remotingExecutor with remotingServer
      // -- will eventually bind controller to remotingServer
      this.registerProcessor();
      // Enable a scheduled task to scan brokers every 10 seconds to remove inactive brokers
      // -- Not active for 2 minutes (lastUpdateTimestamp not updated
      this.scheduledExecutorService.scheduleAtFixedRate((Runnable) () -> 
              NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5.10, TimeUnit.SECONDS);
      // Start the scheduled task for printing KV configurations
      this.scheduledExecutorService.scheduleAtFixedRate((Runnable) () ->
              NamesrvController.this.kvConfigManager.printAllPeriodically(), 1.10, TimeUnit.MINUTES);
      // Tls secure transmission process omitted......

      return true; }}Copy the code

Start starts nettyServer

The core logic of this step is to start a nettyServer server

public class NamesrvStartup {
   @Override
   public void start(a) {
      this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
              nettyServerConfig.getServerWorkerThreads(),
              new ThreadFactory() {
                 private AtomicInteger threadIndex = new AtomicInteger(0);

                 @Override
                 public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); }}); prepareSharableHandlers(); ServerBootstrap childHandler =// Two thread groups prepared in NettyRemotingServer#init eventLoopGroupBoss, eventLoopGroupSelector
              this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                      EpollServerSocketChannel = pollServerSocketChannel = pollServerSocketChannel Or NioServerSocketChannel?
                      .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                      // Initialize the server connectable queue
                      // - When multiple clients arrive, the server will queue up the client connection requests that cannot be processed
                      .option(ChannelOption.SO_BACKLOG, 1024)
                      // Allow reuse of local addresses and ports
                      .option(ChannelOption.SO_REUSEADDR, true)
                      If the value is set to true, TCP automatically sends an active probe packet if there is no data communication within two hours
                      .option(ChannelOption.SO_KEEPALIVE, false)
                      // Disable the Nagle algorithm
                      .childOption(ChannelOption.TCP_NODELAY, true)
                      // Send and receive buffer sizes
                      .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                      .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                      // Bind the IP address and port
                      .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                      .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                    .addLast(defaultEventExecutorGroup,
                                            encoder,
                                            new NettyDecoder(),
                                            new IdleStateHandler(0.0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                            connectionManageHandler,
                                            // The NettyServerHandler that handles read and write requests, i.e., serverHandler. The processMessageReceived(CTX, MSG) method is eventually calledserverHandler ); }});// Whether to enable caching
      if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
         childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
      }

      try {
         ChannelFuture sync = this.serverBootstrap.bind().sync();
         InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
         this.port = addr.getPort();
      } catch (InterruptedException e1) {
         throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
      }

      if (this.channelEventListener ! =null) {
         this.nettyEventExecutor.start();
      }

      // periodically processing requests :1000ms performs a scan and filters discarded requests
      this.timer.scheduleAtFixedRate(new TimerTask() {
         @Override
         public void run(a) {
            try {
               NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
               log.error("scanResponseTable exception", e); }}},1000 * 3.1000);
   }
   // omit other...
}
Copy the code