Apache Zookeeper is an open-source distributed coordination service component that provides high availability (HA) security. Therefore, it is widely used in the ha security infrastructure of other components.

We interpret the zooKeeper service node startup process from the source level, so as to be familiar with the underlying principle and internal core process implementation logic of ZooKeeper.

Download zooKeeper source code (select version 3.6.1 here) and import IDEA, which is a Maven-managed project.

Find the startup class

Where is the node startup portal? We need to find the startup script zkserver. sh (Linux) or zkServer. CMD (Windows) in the bin directory and find the startup class in the script.

So the command to start the JVM process is shown in the box below:

nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
    "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
    -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
Copy the code

$ZOOMAIN $ZOOMAIN $ZOOMAIN

To find a zookeeper node startup class is org. Apache. Zookeeper. Server. Quorum. QuorumPeerMain.

Find the class, enter the main method, and you can see that it creates a QuorumPeerMain object, then calls the initializeAndRun method of that object, and catches any possible exceptions and exits the process with different error exception codes. Finally, the JVM exits normally (normally, JVM processes continue to run after the Zookeeper service is started).

    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        // Create a QuorumPeerMain object
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            // Initialize and run with parameters
            main.initializeAndRun(args);
        // For all kinds of exceptions that may occur during the running process, and exit the service with different exception codes
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            ZKAuditProvider.addServerStartFailureAuditLog();
            ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            ZKAuditProvider.addServerStartFailureAuditLog();
            ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
        } catch (DatadirException e) {
            LOG.error("Unable to access datadir, exiting abnormally", e);
            System.err.println("Unable to access datadir, exiting abnormally");
            ZKAuditProvider.addServerStartFailureAuditLog();
            ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
        } catch (AdminServerException e) {
            LOG.error("Unable to start AdminServer, exiting abnormally", e);
            System.err.println("Unable to start AdminServer, exiting abnormally");
            ZKAuditProvider.addServerStartFailureAuditLog();
            ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            ZKAuditProvider.addServerStartFailureAuditLog();
            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
        }
        // Exit normally
        LOG.info("Exiting normally");
        ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
    }

Copy the code

Exceptions include invalid parameters, configuration, data directory access, AdminServer startup, and other exceptions.

Creating a QuorumPeerMain object is relatively simple. The QuorumPeerMain class does not override the no-parameter constructor, so there is no logic.

Let’s focus on the initialization and running process:

The QuorumPeerMain object is initialized and run

The initializeAndRun method of the QuorumPeerMain object has three main parts:

  1. Build the QuorumPeerConfig object and parse the parameters
  2. Generate a data directory cleanup managerDatadirCleanupManagerAnd run
  3. Start the service by calling runFromConfig (cluster distributed mode) or zooKeeperServermain. main (standalone local mode)

A) Parsing parameters

. Org. Apache. Zookeeper server. Quorum. QuorumPeerConfig is a configuration parameter class, its attributes is QuorumPeer (behind zookeeper start service instance type, you can see) the parameters needed to run, It also provides default values for some parameters.

When viewing the Zookeeper startup script, you can see that the $ZOOCFG parameter is passed when the QuorumPeerMain class is started, and the parameter value is zoo.cfg. Therefore, this step is to parse the file to obtain relevant configuration parameters.

Then inside the parse method, load the zoo. CFG configuration file to generate the Properties object, and call the parseProperties method to retrieve the parameters.

Generate and start the data directory cleanup manager

Org. Apache. Zookeeper. Server. DatadirCleanupManager class is mainly used for the automatic clean-up snapshot file and transaction log files, involving two parameters: ‘autopurge purgeInterval’ automatic cleanup interval length, and ‘autopurge snapRetainCount’ automatic cleaning snapshot number.

The start method is called, a Timer Timer is generated internally and a TimerTask of type PurgeTask is generated, and the cleanup task is executed at each cleanup interval.

Three) Start the service

Here only to cluster start method to explore the source code.

The runFromConfig method generates a QuorumPeer object, calls its start method to execute, and then calls the Join method to keep the thread waiting.

    public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }

        LOG.info("Starting quorum peer");
        MetricsProvider metricsProvider;
        try {
            metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
                config.getMetricsProviderClassName(),
                config.getMetricsProviderConfiguration());
        } catch (MetricsProviderLifeCycleException error) {
            throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
        }
        try {
            ServerMetrics.metricsProviderInitialized(metricsProvider);
            ServerCnxnFactory cnxnFactory = null;
            ServerCnxnFactory secureCnxnFactory = null;

            if(config.getClientPortAddress() ! =null) {
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }

            if(config.getSecureClientPortAddress() ! =null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }

            quorumPeer = getQuorumPeer();
            quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            //quorumPeer.setQuorumPeers(config.getAllMembers());
            quorumPeer.setElectionType(config.getElectionAlg());
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if(config.getLastSeenQuorumVerifier() ! =null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setSslQuorum(config.isSslQuorum());
            quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
            quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
            if (config.sslQuorumReloadCertFiles) {
                quorumPeer.getX509Util().enableCertFileReloading();
            }
            quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
            quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
            quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

            // sets quorum sasl authentication configurations
            quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
            if (quorumPeer.isQuorumSaslAuthEnabled()) {
                quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
                quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
                quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
                quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
                quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
            }
            quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
            quorumPeer.initialize();

            if (config.jvmPauseMonitorToRun) {
                quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }

            quorumPeer.start();
            ZKAuditProvider.addZKStartStopAuditLog();
            quorumPeer.join();
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Quorum Peer interrupted", e);
        } finally {
            if(metricsProvider ! =null) {
                try {
                    metricsProvider.stop();
                } catch (Throwable error) {
                    LOG.warn("Error while stopping metrics", error); }}}}Copy the code

New a QuorumPeer object and set its various parameters, then call the start method to start, finally join wait stop. Therefore, each Zookeeper node really starts a QuorumPeer object, and all other logic is in this class. We will focus on interpreting the source code of this class later to explore what Zookeeper does when it starts.