Elasticsearch series ii after main 7.x

In our last article, “Elasticsearch before 7.x”, we learned that Elasticsearch before 7.x is based on Bully. As of 7.0, Elasticsearch uses a Raft based algorithm for master selection.

Why re-implement master selection using Raft?

The discovery.zen.minimum_master_nodes parameter indicates how many primary eligible nodes are participating in the election. If the configuration is forgotten or incorrect, the system is temporarily unavailable. You also need to modify this configuration when expanding the capacity of the active node.

2. The old election was too slow. It took three rounds of ping to find other nodes and complete the election.

Introduction to Raft Algorithm

Raft is an algorithm designed to solve distributed consistency problems.

The Raft node has three states: Follower, Candidate, and Leader

When the node is started, it is in the Follower state. If the heartbeat from the Leader is not received for a period of time, the Candidate state is entered and an election is initiated. When a majority of votes are received, the Leader is elected.

If a node is found to be of a newer version than its own version, the node is voted. If you are the Leader but discover a new node than yourself, you abandon the Leader and enter the Follower state.

In addition, Raft algorithm divides time into term terms. Term begins with an election and ends when the Leader is not elected or the Leader breaks down.

Elasticsearch election process

Elasticsearch source code parsing

The underlying interface for the election is Discovery, and the new Raft based implementation class is Coordinator. When the node is started, the election will begin by calling the startInitialJoin method.

    @Override
    public void startInitialJoin(a) {
        synchronized (mutex) {
            becomeCandidate("startInitialJoin");
        }
        clusterBootstrapService.scheduleUnconfiguredBootstrap();
    }
Copy the code

When the node is started, it first enters the Candidate state through becomeCandidate method to do some preparation work for election. After scheduleUnconfiguredBootstrap method to election. BecomeCandidate method is relatively simple, mainly to see scheduleUnconfiguredBootstrap method

		    void scheduleUnconfiguredBootstrap(a) {
        if (unconfiguredBootstrapTimeout == null) {
            return;
        }
				// If the node role is not master, it returns directly and does not participate in the election.
        if (transportService.getLocalNode().isMasterNode() == false) {
            return;
        }
        / / wait for unconfiguredBootstrapTimeout time (after 3 seconds by default), began to election.
        transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
            @Override
            public void run(a) {
                final Set<DiscoveryNode> discoveredNodes = getDiscoveredNodes();
                final List<DiscoveryNode> zen1Nodes = discoveredNodes.stream().filter(Coordinator::isZen1Node).collect(Collectors.toList());
                if (zen1Nodes.isEmpty()) {
                    // If none of them are ZenPing nodes, start Raft election
                    startBootstrap(discoveredNodes, emptyList());
                } else {
                    logger.info(Avoiding best-effort cluster bootstrapping due to discovery of pre-7.0 nodes {}", zen1Nodes); }}}); }Copy the code

Wait for unconfiguredBootstrapTimeout time (after 3 seconds by default), start startBootstrap election.

    private void startBootstrap(Set<DiscoveryNode> discoveryNodes, List<String> unsatisfiedRequirements) {
        assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
        assert discoveryNodes.stream().noneMatch(Coordinator::isZen1Node) : discoveryNodes;
        assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
        if (bootstrappingPermitted.compareAndSet(true.false)) {
            doBootstrap(newVotingConfiguration(Stream.concat(discoveryNodes.stream().map(DiscoveryNode::getId), unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s)) .collect(Collectors.toSet()))); }}Copy the code

After verification, a new election is started using the doBootstrap method.

    private void doBootstrap(VotingConfiguration votingConfiguration) {
        assert transportService.getLocalNode().isMasterNode();

        try {
            votingConfigurationConsumer.accept(votingConfiguration);
        } catch (Exception e) {
            // Try again 10 seconds later
            transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC,
                new Runnable() {
                    @Override
                    public void run(a) { doBootstrap(votingConfiguration); }}); }}Copy the code

Through votingConfigurationConsumer function to deal with, if abnormal, then retry after 10 s. The function in the Coordinator initialization, set to the Coordinator. SetInitialConfiguration method.

public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
        synchronized (mutex) {
            final ClusterState currentState = getStateForMasterService();
            // Some basic checks
            final List<DiscoveryNode> knownNodes = new ArrayList<>();
            knownNodes.add(getLocalNode());
            peerFinder.getFoundPeers().forEach(knownNodes::add);
            // If the number of nodes is less than half of the historical number, an exception is thrown
            if(votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
                throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
                    "[knownNodes=" + knownNodes + "," + votingConfiguration + "]");
            }

            logger.info("setting initial configuration to {}", votingConfiguration);
            final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData())
                .lastAcceptedConfiguration(votingConfiguration)
                .lastCommittedConfiguration(votingConfiguration)
                .build();

            MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
            // automatically generate a UID for the metadata if we need to
            metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
            metaDataBuilder.coordinationMetaData(coordinationMetaData);

            // Initialize the cluster status
 coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
            // Initialize preVoteCollector's response
            preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
            // The election begins
            startElectionScheduler();
            return true; }}Copy the code

After doing some initialization, the setInitialConfiguration method asynchronously calls the PreVoteCollector. Start method via the startElectionScheduler method to start the election vote.

private void startElectionScheduler(a) {
        electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
            @Override
            public void run(a) {
                synchronized (mutex) {
                    if (mode == Mode.CANDIDATE) {
                        final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
												// Fail fast. If the local node cannot win the election, it does not initiate the election.
                        if (localNodeMayWinElection(lastAcceptedState) == false) {
                            return;
                        }

                        if(prevotingRound ! =null) {
                            prevotingRound.close();
                        }
                        final List<DiscoveryNode> discoveredNodes
                            = getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());
												// Start votingprevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes); }}}}); }Copy the code

PreVoteCollector. Start is as follows, which initiates voting requests for all nodes in turn.

void start(final Iterable<DiscoveryNode> broadcastNodes) {... broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,new TransportResponseHandler<PreVoteResponse>() {
                    @Override
                    public PreVoteResponse read(StreamInput in) throws IOException {
                        return new PreVoteResponse(in);
                    }

                    @Override
                    public void handleResponse(PreVoteResponse response) { handlePreVoteResponse(response, n); }})); }Copy the code

Vote when other node receives the request, through PreVoteCollector handlePreVoteRequest to deal with

    private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
        updateMaxTermSeen.accept(request.getCurrentTerm());

        Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
        assertstate ! =null : "received pre-vote request before fully initialised";

        final DiscoveryNode leader = state.v1();
        final PreVoteResponse response = state.v2();

        if (leader == null) {
            return response;
        }

        if (leader.equals(request.getSourceNode())) {
            return response;
        }

        throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
    }
Copy the code

First, the updateMaxTermSeen function is called to update the maximum term. If you are the master, but have a term larger than your own, you relinquish the master status and re-elect. Then, if there is no master or the master is the requested node, the response vote will be cast; otherwise, the response vote will be rejected.

private void updateMaxTermSeen(final long term) {
        synchronized (mutex) {
            maxTermSeen = Math.max(maxTermSeen, term);
            final long currentTerm = getCurrentTerm();
            if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
                // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
                // since we check whether a term bump is needed at the end of the publication too.
                if (publicationInProgress()) {
                    logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
                } else {
                    try {
                        logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
                        ensureTermAtLeast(getLocalNode(), maxTermSeen);
                        startElection();
                    } catch (Exception e) {
                        logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
                        becomeCandidate("updateMaxTermSeen");
                    }
                }
            }
        }
    }
Copy the code

After the vote response is received, it is processed through handlePreVoteResponse.

private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
            // Update the maximum term
						updateMaxTermSeen.accept(response.getCurrentTerm());
					  // If the term of the response node is larger than its own, or the same but the version is higher than its own, the response of this vote will not be added to the vote.
            if (response.getLastAcceptedTerm() > clusterState.term()
                || (response.getLastAcceptedTerm() == clusterState.term()
                && response.getLastAcceptedVersion() > clusterState.getVersionOrMetaDataVersion())) {
                logger.debug("{} ignoring {} from {} as it is fresher".this, response, sender);
                return;
            }

            preVotesReceived.put(sender, response);

            // create a fake VoteCollection based on the pre-votes and check if there is an election quorum
            final VoteCollection voteCollection = new VoteCollection();
            final DiscoveryNode localNode = clusterState.nodes().getLocalNode();
            final PreVoteResponse localPreVoteResponse = getPreVoteResponse();

            preVotesReceived.forEach((node, preVoteResponse) -> voteCollection.addJoinVote(
                new Join(node, localNode, preVoteResponse.getCurrentTerm(),
                preVoteResponse.getLastAcceptedTerm(), preVoteResponse.getLastAcceptedVersion())));
						// If you don't get a majority of votes, return
            if (electionStrategy.isElectionQuorum(clusterState.nodes().getLocalNode(), localPreVoteResponse.getCurrentTerm(),
                localPreVoteResponse.getLastAcceptedTerm(), localPreVoteResponse.getLastAcceptedVersion(),
                clusterState.getLastCommittedConfiguration(), clusterState.getLastAcceptedConfiguration(), voteCollection) == false) {
                return;
            }
            startElection.run();
        }
Copy the code

Similarly, update the maximum term first, and then check whether the vote is valid. When you receive a majority of the votes, you ask other nodes to join you using the startElection method

private void startElection(a) {
        synchronized (mutex) {
            // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
            // to check our mode again here.
            if (mode == Mode.CANDIDATE) {
                if (localNodeMayWinElection(getLastAcceptedState()) == false) {
                    logger.trace("skip election as local node may not win it: {}", getLastAcceptedState().coordinationMetaData());
                    return;
                }

                final StartJoinRequest startJoinRequest
                    = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
                logger.debug("starting election with {}", startJoinRequest);
                getDiscoveredNodes().forEach(node -> {
                    if (isZen1Node(node) == false) { joinHelper.sendStartJoinRequest(startJoinRequest, node); }}); }}}Copy the code

The startElection method sends StartJoinRequest to all nodes (non-Zen nodes) to let other nodes join it. Meanwhile, term+1 represents the new term.

After receiving the StartJoinRequest, other nodes send join requests to the StartJoinRequest.

transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false.false,
            StartJoinRequest::new,
            (request, channel, task) -> {
                final DiscoveryNode destination = request.getSourceNode();
                sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
                channel.sendResponse(Empty.INSTANCE);
            });
Copy the code

Construct the Join request using joinLeaderInTerm. Also update term and change the state to Caididate

private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
    synchronized (mutex) {
        logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
        // Term will be updated
        final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
        lastJoin = Optional.of(join);
        peerFinder.setCurrentTerm(getCurrentTerm());
        if(mode ! = Mode.CANDIDATE) { becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector
        } else {
            followersChecker.updateFastResponseState(getCurrentTerm(), mode);
            preVoteCollector.update(getPreVoteResponse(), null);
        }
        returnjoin; }}Copy the code

SendJoinRequest then sends the join request. This is an easy one, so I won’t post the code.

When a node receives a JoinRequest request, it processes it through handleJoinRequest. Ping the source node of the JoinRequest. If it is an election, run sendValidateJoinRequest to confirm that the vote is valid.

private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
        transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
            final ClusterState stateForJoinValidation = getStateForMasterService();

            if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
                onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
                if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                    // we do this in a couple of places including the cluster update thread. This one here is really just best effort
                    // to ensure we fail as fast as possible.
                    JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
                        stateForJoinValidation.getNodes().getMinNodeVersion());
                }
                sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
            } else {
                processJoinRequest(joinRequest, joinCallback);
            }
        }, joinCallback::onFailure));
    }
Copy the code

A ValidateJoinRequest is received as long as the cluster is the same and the version and index are compatible. Verify that the JoinRequest is valid.

transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
            ThreadPool.Names.GENERIC, ValidateJoinRequest::new,
            (request, channel, task) -> {
                final ClusterState localState = currentStateSupplier.get();
                if (localState.metaData().clusterUUIDCommitted() &&
                    localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID()) == false) {
                    throw new CoordinationStateRejectedException("join validation on cluster state" +
                        " with a different cluster uuid " + request.getState().metaData().clusterUUID() +
                        " than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
                }
                joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
                channel.sendResponse(Empty.INSTANCE);
            });
Copy the code

After confirming that the Join is valid, processJoinRequest to process the Join request. You can see that if a join does not win an election before it wins an election after it declares itself the Leader.

    private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
        final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
        synchronized (mutex) {
            final CoordinationState coordState = coordinationState.get();
            final boolean prevElectionWon = coordState.electionWon();

            optionalJoin.ifPresent(this::handleJoin);
            joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);

            if (prevElectionWon == false && coordState.electionWon()) {
                becomeLeader("handleJoinRequest"); }}}Copy the code

Take a look at handleJoin and make sure the term is up to date first with the ensureTermAtLeast method. The coordinationState then processes the join request.

    private void handleJoin(Join join) {
        synchronized (mutex) {
            ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin); . coordinationState.get().handleJoin(join);// this might fail and bubble up the exception  }}Copy the code

Take a look at the Ensureter Least method. Term +1 was sent to StartJoinRequest and was returned by JoinRequest, so getCurrentTerm is smaller than targetTerm when the first Join is processed. Renew term here by adding yourself (vote yourself).

    private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
        assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
        if (getCurrentTerm() < targetTerm) {
            return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
        }
        return Optional.empty();
    }
Copy the code

The next see coordinationState handleJoin. The isElectionQuorum method determines whether a majority joins, and if so wins the election itself.

public boolean handleJoin(Join join) {...boolean added = joinVotes.addJoinVote(join);
        boolean prevElectionWon = electionWon;
  			// If most joins are received, the election is won.
        electionWon = isElectionQuorum(joinVotes);
        if (electionWon && prevElectionWon == false) {
            logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
            lastPublishedVersion = getLastAcceptedVersion();
        }
        return added;
    }
Copy the code

reference

  • raft.github.io/
  • Github.com/elastic/ela…

Recommended reading

Dapr Combat (part 1)

Dapr Combat part ii

DS version control core principles revealed

DS 2.0 era API operation posture

, recruiting

Zhengcaiyun Technology team (Zero) is a passionate, creative and executive team based in picturesque Hangzhou. The team has more than 300 r&d partners, including “old” soldiers from Alibaba, Huawei and NetEase, as well as newcomers from Zhejiang University, University of Science and Technology of China, Hangzhou Electric And other universities. Team in the day-to-day business development, but also in cloud native, chain blocks, artificial intelligence, low code platform system, middleware, data, material, engineering platform, the performance experience, visualization technology areas such as exploration and practice, to promote and fell to the ground a series of internal technical products, continue to explore new frontiers of technology. In addition, the team is involved in community building, Currently, There are Google Flutter, SciKit-Learn, Apache Dubbo, Apache Rocketmq, Apache Pulsar, CNCF Dapr, Apache DolphinScheduler, and Alibaba Seata and many other contributors to the excellent open source community. If you want to change something that’s been bothering you, want to start bothering you. If you want to change, you’ve been told you need more ideas, but you don’t have a solution. If you want change, you have the power to make it happen, but you don’t need it. If you want to change what you want to accomplish, you need a team to support you, but you don’t have the position to lead people. If you want to change the original savvy is good, but there is always a layer of fuzzy window…… If you believe in the power of believing, believing that ordinary people can achieve extraordinary things, believing that you can meet a better version of yourself. If you want to be a part of the process of growing a technology team with deep business understanding, sound technology systems, technology value creation, and impact spillover as your business takes off, I think we should talk. Any time, waiting for you to write something and send it to [email protected]

Wechat official account

The article is published synchronously, the public number of political cloud technology team, welcome to pay attention to