preface

A few days ago, I had dinner and talked with the big shots of Tencent. When I talked about my understanding of SOFAJRaft, I naturally thought I understood it well. But the big shots asked me how the logs between SOFAJRaft clusters were copied. I was at a loss to say how it was done, so this time I’ll take a look at how log replication is done in SOFAJRaft.

The Leader sends a probe to obtain the LastLogIndex of the Follower

After establishing a connection between the Leader node and the followers using the Replicator, the Leader node sends a Probe request of the Probe type to know the log location that the followers have, so that the Leader node can send follow-up logs to the followers.

The general process is as follows:

NodeImpl#becomeLeader->replicatorGroup#addReplicator->Replicator#start->Replicator#sendEmptyEntries
Copy the code

Finally, a probe is sent to obtain the LastLogIndex of the Follower by calling the Replicator sendEmptyEntries method

Replicator#sendEmptyEntries

private void sendEmptyEntries(final boolean isHeartbeat,
                              final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
    final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
    // Set the cluster configuration to rb, such as Term, GroupId, ServerId, etc
    if(! fillCommonFields(rb,this.nextIndex - 1, isHeartbeat)) {
        // id is unlock in installSnapshot
        installSnapshot();
        if(isHeartbeat && heartBeatClosure ! =null) {
            Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
                "Fail to send heartbeat to peer %s".this.options.getPeerId()));
        }
        return;
    }
    try {
        final long monotonicSendTimeMs = Utils.monotonicMs();
        final AppendEntriesRequest request = rb.build();

        if (isHeartbeat) {
            ....// Omit the heartbeat code
        } else {
            // The statInfo class does not see where it is useful,
            // Sending a probe request.
            // The leader sends a probe to obtain the LastLogIndex of the Follower
            this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
            // Set lastLogIndex to 1 less than firstLogIndex
            this.statInfo.firstLogIndex = this.nextIndex;
            this.statInfo.lastLogIndex = this.nextIndex - 1;
            this.appendEntriesCounter++;
            // Sets the current Replicator to the sending probe
            this.state = State.Probe;
            final int stateVersion = this.version;
            // return reqSeq and increment reqSeq by one
            final int seq = getAndIncrementReqSeq();
            final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
                request, -1.new RpcResponseClosureAdapter<AppendEntriesResponse>() {

                    @Override
                    public void run(final Status status) {
                        onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, stateVersion, monotonicSendTimeMs); }});//Inflight is an abstraction of logentries sent in batches. It indicates which logentries have been sent as log copy requests
            // Encapsulate logEntry into Inflight
            addInflight(RequestType.AppendEntries, this.nextIndex, 0.0, seq, rpcFuture);
        }
        LOG.debug("Node {} send HeartbeatRequest to {} term {} lastCommittedIndex {}".this.options.getNode()
            .getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex());
    } finally {
        this.id.unlock(); }}Copy the code

When the sendEmptyEntries method is called, isHeartbeat is false and heartBeatClosure is null because we send probes to get Follower displacement. FillCommonFields (groupId, ServerId, PeerIdLogIndex, etc.);

private boolean fillCommonFields(final AppendEntriesRequest.Builder rb, long prevLogIndex, final boolean isHeartbeat) {
    final long prevLogTerm = this.options.getLogManager().getTerm(prevLogIndex); . rb.setTerm(this.options.getTerm());
    rb.setGroupId(this.options.getGroupId());
    rb.setServerId(this.options.getServerId().toString());
    rb.setPeerId(this.options.getPeerId().toString());
    rb.setPrevLogIndex(prevLogIndex);
    rb.setPrevLogTerm(prevLogTerm);
    rb.setCommittedIndex(this.options.getBallotBox().getLastCommittedIndex());
    return true;
}
Copy the code

Note that prevLogIndex is nextIndex-1, which means the current index goes down, and it sets the properties in the statInfo instance, but I don’t see where the statInfo object is useful. An AppendEntriesRequest request is then sent to the Follower and onRpcReturned responds to the request. After sending the request, call addInflight to initialize an Inflight instance and add it to the Inflight set as follows:

private void addInflight(final RequestType reqType, final long startIndex, final int count, final int size,
                         final int seq, final Future<Message> rpcInfly) {
    this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
    this.inflights.add(this.rpcInFly);
    this.nodeMetrics.recordSize("replicate-inflights-count".this.inflights.size());
}
Copy the code

Inflight is an abstraction of logentries sent in batches. It indicates which logentries have been sent as log copy requests. In this case, logentries are encapsulated in Inflight.

The Leader sends logs to the followers in batches

Replicator#sendEntries

private boolean sendEntries(final long nextSendingIndex) {
    final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
    // Add the current Replicator configuration information to the RB database
    if(! fillCommonFields(rb, nextSendingIndex -1.false)) {
        // unlock id in installSnapshot
        installSnapshot();
        return false;
    }

    ByteBufferCollector dataBuf = null;
    // The maximum size is 1024
    final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();

    // A technique similar to object pooling is used to avoid duplicate object creation
    final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
    try {
        // Loop through all logentries encapsulated in byteBufList and EMB
        for (int i = 0; i < maxEntriesSize; i++) {
            final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
            //nextSendingIndex represents the next index to be sent, and I represents the offset
            if(! prepareEntry(nextSendingIndex, i, emb, byteBufList)) {break;
            }
            rb.addEntries(emb.build());
        }
        // If EntriesCount is 0, there is no new data in LogManager
        if (rb.getEntriesCount() == 0) {
            if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
                installSnapshot();
                return false;
            }
            // _id is unlock in _wait_more
            waitMoreEntries(nextSendingIndex);
            return false;
        }
        // Add data from byteBufList to rb
        if (byteBufList.getCapacity() > 0) {
            dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
            for (final ByteBuffer b : byteBufList) {
                dataBuf.put(b);
            }
            finalByteBuffer buf = dataBuf.getBuffer(); buf.flip(); rb.setData(ZeroByteStringHelper.wrap(buf)); }}finally {
        // recycle byteBufList
        RecycleUtil.recycle(byteBufList);
    }

    final AppendEntriesRequest request = rb.build();
    if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Node {} send AppendEntriesRequest to {} term {} lastCommittedIndex {} prevLogIndex {} prevLogTerm {} logIndex {} count {}".this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(),
            request.getCommittedIndex(), request.getPrevLogIndex(), request.getPrevLogTerm(), nextSendingIndex,
            request.getEntriesCount());
    }
    //statInfo
    this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
    this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
    this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();

    final Recyclable recyclable = dataBuf;
    final int v = this.version;
    final long monotonicSendTimeMs = Utils.monotonicMs();
    final int seq = getAndIncrementReqSeq();
    final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
        request, -1.new RpcResponseClosureAdapter<AppendEntriesResponse>() {

            @Override
            public void run(final Status status) {
                // Recycle resources
                RecycleUtil.recycle(recyclable);
                onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, v, monotonicSendTimeMs); }});/ / add Inflight
    addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
        seq, rpcFuture);
    return true;

}
Copy the code
  1. The fillCommonFields method is called to fill in the current Replicator configuration information to the RB.
  2. Call prepareEntry, calculate the current offset based on the current I and nextSendingIndex, then go to LogManager to find the corresponding LogEntry, and set the properties in the LogEntry to emB. Add data from LogEntry to the RecyclableByteBufferList.
  3. If there is no new data in LogEntry, then EntriesCount will be 0, and return;
  4. Add rb to byteBufList and add rb to byteBufList. Add rb to byteBufList and add rb to byteBufList and add rb to byteBufList.
  5. New AppendEntriesRequest instance to send requests;
  6. Add an Inflight to a queue. The Leader maintains a queue and adds an Inflight to the queue for each batch of Logentries it emits. In this way, when it knows that a batch of Logentries has failed to replicate, The logEntry batch and all subsequent logs can then be copied back to followers depending on the Inflight values in the queue. This ensures that the log replication can be completed and the log replication sequence remains unchanged

The RecyclableByteBufferList can be instantiated by object pools. The RecyclableByteBufferList can be instantiated by object pools. The RecyclableByteBufferList can be instantiated by object pools.

Let’s take a closer look at the specific methods in sendEntries.

PrepareEntry populates emB properties

Replicator#prepareEntry

boolean prepareEntry(final long nextSendingIndex, final int offset, final RaftOutter.EntryMeta.Builder emb,
                     final RecyclableByteBufferList dateBuffer) {
    if (dateBuffer.getCapacity() >= this.raftOptions.getMaxBodySize()) {
        return false;
    }
    // Set the current index to send
    final long logIndex = nextSendingIndex + offset;
    // If the index is not found in LogManager, return it directly
    final LogEntry entry = this.options.getLogManager().getEntry(logIndex);
    if (entry == null) {
        return false;
    }
    // Set the properties in LogEntry to emB
    emb.setTerm(entry.getId().getTerm());
    if (entry.hasChecksum()) {
        emb.setChecksum(entry.getChecksum()); / / since 1.2.6
    }
    emb.setType(entry.getType());
    if(entry.getPeers() ! =null) { Requires.requireTrue(! entry.getPeers().isEmpty(),"Empty peers at logIndex=%d", logIndex);
        for (final PeerId peer : entry.getPeers()) {
            emb.addPeers(peer.toString());
        }
        if(entry.getOldPeers() ! =null) {
            for (finalPeerId peer : entry.getOldPeers()) { emb.addOldPeers(peer.toString()); }}}else{ Requires.requireTrue(entry.getType() ! = EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION,"Empty peers but is ENTRY_TYPE_CONFIGURATION type at logIndex=%d", logIndex);
    }
    final intremaining = entry.getData() ! =null ? entry.getData().remaining() : 0;
    emb.setDataLen(remaining);
    // Put the data in the LogEntry into the dateBuffer
    if(entry.getData() ! =null) {
        // should slice entry data
        dateBuffer.add(entry.getData().slice());
    }
    return true;
}
Copy the code
  1. Compare the incoming dateBuffer to see if it has exceeded the size set by the system (512 * 1024), or return false if it has
  2. If LogEntry is not found, then return false. The outer if judgment will execute break to break the loop
  3. The properties in the LogEntry are then set to the EMB object. Finally, the data in the LogEntry is added to the dateBuffer, separating the data from the properties

Follower processes log replication requests sent by the Leader

The leader after sending AppendEntriesRequest request, the request of data will be handled by AppendEntriesRequestProcessor in followers

The process is processRequest0

public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
                               final RpcRequestClosure done) {

    final Node node = (Node) service;

    // Use pipeline by default
    if (node.getRaftOptions().isReplicatorPipeline()) {
        final String groupId = request.getGroupId();
        final String peerId = request.getPeerId();
        // Get the number of requests, in the groupId+peerId dimension
        final int reqSequence = getAndIncrementSequence(groupId, peerId, done.getBizContext().getConnection());
        //Follower processes the log request sent by the leader
        final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
            reqSequence, groupId, peerId));
        // Normal data returns null, abnormal data returns response
        if(response ! =null) {
            sendSequenceResponse(groupId, peerId, reqSequence, done.getAsyncContext(), done.getBizContext(),
                response);
        }
        return null;
    } else {
        returnservice.handleAppendEntriesRequest(request, done); }}Copy the code

Call service handleAppendEntriesRequest will call to NodeIml handleAppendEntriesRequest method, HandleAppendEntriesRequest method just anomalies and the leader did not send information to return, is normally returns null

Process response log replication requests

NodeIml#handleAppendEntriesRequest

public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
    boolean doUnlock = true;
    final long startMs = Utils.monotonicMs();
    this.writeLock.lock();
    // Get the number of entrylogs
    final int entriesCount = request.getEntriesCount();
    try {
        // Check whether the current node is active
        if (!this.state.isActive()) {
            LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
                getNodeId(), this.state.name());
        }
        // Verify that the passed serverId can be properly parsed
        final PeerId serverId = new PeerId();
        if(! serverId.parse(request.getServerId())) { LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(),
                request.getServerId());
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s.",
                request.getServerId());
        }
        // Check the term
        // Check stale term
        if (request.getTerm() < this.currTerm) {
            LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(),
                request.getServerId(), request.getTerm(), this.currTerm);
            return AppendEntriesResponse.newBuilder() //
                .setSuccess(false) //
                .setTerm(this.currTerm) //
                .build();
        }

        // Check term and state to step down
        // If the current node is not a Follower node, perform the StepDown operation
        checkStepDown(request.getTerm(), serverId);
        // This indicates that the requested node is not the leader of the current node
        if(! serverId.equals(this.leaderId)) {
            LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
                serverId, this.currTerm, this.leaderId);
            // Increase the term by 1 and make both leaders step down to minimize the
            // loss of split brain
            stepDown(request.getTerm() + 1.false.new Status(RaftError.ELEADERCONFLICT,
                "More than one leader in the same term."));
            return AppendEntriesResponse.newBuilder() //
                .setSuccess(false) //
                .setTerm(request.getTerm() + 1) //
                .build();
        }

        updateLastLeaderTimestamp(Utils.monotonicMs());

        // Verify whether a snapshot is being generated
        if (entriesCount > 0 && this.snapshotExecutor ! =null && this.snapshotExecutor.isInstallingSnapshot()) {
            LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
            return RpcResponseFactory.newResponse(RaftError.EBUSY, "Node %s:%s is installing snapshot.".this.groupId, this.serverId);
        }
        Nextindex-1 of the node that initiated the request is passed in
        final long prevLogIndex = request.getPrevLogIndex();
        final long prevLogTerm = request.getPrevLogTerm();
        final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
        PrevLogIndex does not match the prevLogIndex of the current node
        if(localPrevLogTerm ! = prevLogTerm) {final long lastLogIndex = this.logManager.getLastLogIndex();

            LOG.warn(
                "Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
                getNodeId(), request.getServerId(), request.getTerm(), prevLogIndex, prevLogTerm, localPrevLogTerm,
                lastLogIndex, entriesCount);

            return AppendEntriesResponse.newBuilder() //
                .setSuccess(false) //
                .setTerm(this.currTerm) //
                .setLastLogIndex(lastLogIndex) //
                .build();
        }
        // Respond to heartbeat or send sendEmptyEntry
        if (entriesCount == 0) {
            // heartbeat
            final AppendEntriesResponse.Builder respBuilder = AppendEntriesResponse.newBuilder() //
                .setSuccess(true) //
                .setTerm(this.currTerm)
                // Returns the latest index of the current node
                .setLastLogIndex(this.logManager.getLastLogIndex());
            doUnlock = false;
            this.writeLock.unlock();
            // see the comments at FollowerStableClosure#run()
            this.ballotBox.setLastCommittedIndex(Math.min(request.getCommittedIndex(), prevLogIndex));
            return respBuilder.build();
        }

        // Parse request
        long index = prevLogIndex;
        final List<LogEntry> entries = new ArrayList<>(entriesCount);
        ByteBuffer allData = null;
        if (request.hasData()) {
            allData = request.getData().asReadOnlyByteBuffer();
        }
        // Get all the data
        final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList();
        for (int i = 0; i < entriesCount; i++) {
            final RaftOutter.EntryMeta entry = entriesList.get(i);
            index++;
            if(entry.getType() ! = EnumOutter.EntryType.ENTRY_TYPE_UNKNOWN) {// Set the value of the logEntry attribute
                final LogEntry logEntry = new LogEntry();
                logEntry.setId(new LogId(index, entry.getTerm()));
                logEntry.setType(entry.getType());
                if (entry.hasChecksum()) {
                    logEntry.setChecksum(entry.getChecksum()); / / since 1.2.6
                }
                // Populate the data with logEntry
                final long dataLen = entry.getDataLen();
                if (dataLen > 0) {
                    final byte[] bs = new byte[(int) dataLen];
                    assertallData ! =null;
                    allData.get(bs, 0, bs.length);
                    logEntry.setData(ByteBuffer.wrap(bs));
                }

                if (entry.getPeersCount() > 0) {
                    // Only entries of the configuration type can have multiple peers
                    if(entry.getType() ! = EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {throw new IllegalStateException(
                                "Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: "
                                        + entry.getType());
                    }

                    final List<PeerId> peers = new ArrayList<>(entry.getPeersCount());
                    for (final String peerStr : entry.getPeersList()) {
                        final PeerId peer = new PeerId();
                        peer.parse(peerStr);
                        peers.add(peer);
                    }
                    logEntry.setPeers(peers);

                    if (entry.getOldPeersCount() > 0) {
                        final List<PeerId> oldPeers = new ArrayList<>(entry.getOldPeersCount());
                        for (final String peerStr : entry.getOldPeersList()) {
                            final PeerId peer = newPeerId(); peer.parse(peerStr); oldPeers.add(peer); } logEntry.setOldPeers(oldPeers); }}else if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
                    throw new IllegalStateException(
                            "Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type");
                }

                // Validate checksum
                if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
                    long realChecksum = logEntry.checksum();
                    LOG.error(
                            "Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, " +
                             "realChecksum={}",
                            logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
                            realChecksum);
                    return RpcResponseFactory.newResponse(RaftError.EINVAL,
                            "The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), realChecksum); } entries.add(logEntry); }}// Store the log and return response
        final FollowerStableClosure closure = new FollowerStableClosure(request, AppendEntriesResponse.newBuilder()
            .setTerm(this.currTerm), this, done, this.currTerm);
        this.logManager.appendEntries(entries, closure);
        // update configuration after _log_manager updated its memory status
        this.conf = this.logManager.checkAndSetConfiguration(this.conf);
        return null;
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
        this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-append-entries-count", entriesCount); }}Copy the code

HandleAppendEntriesRequest method to write very long, but actually do a lot of check, the specific processing logic is not much

  1. Verifies that the current Node is still active, and returns an error response if it is not
  2. Verify that the format of the requested serverId is correct, or return an error response
  3. Verifies that the request’s tenure is less than the current one and returns a response of type AppendEntriesResponse if so
  4. Call the checkStepDown method to check the current node tenure and status, whether there is a leader, etc
  5. If the serverId of the request is the same as the leaderId of the current node, verify that the request was initiated by the leader, and return a AppendEntriesResponse if not
  6. Verify whether a snapshot is being generated
  7. Gets whether the request’s Index in the current node corresponds to a LogEntry with the same tenure as the request passed in, otherwise returns AppendEntriesResponse
  8. If the entriesCount passed in is zero, the leader may send a heartbeat or sendEmptyEntry, return AppendEntriesResponse and wrap the current term and latest index back
  9. The requested data is not empty, so all data is iterated
  10. Instantiate a logEntry, set the data and properties to the logEntry instance, and finally put the logEntry into the entries collection
  11. Call logManager to write the data batch commit log to RocksDB

Sends the response to the leader

Eventually sent to the leader of the response is sent by sendSequenceResponse AppendEntriesRequestProcessor

void sendSequenceResponse(final String groupId, final String peerId, final int seq,
                          final AsyncContext asyncContext, final BizContext bizContext, final Message msg) {
    final Connection connection = bizContext.getConnection();
    // Get context, dimension is groupId and peerId
    final PeerRequestContext ctx = getPeerRequestContext(groupId, peerId, connection);
    final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
    assert(respQueue ! =null);

    synchronized (Utils.withLockObject(respQueue)) {
        // The data to be responded is placed in the priority queue
        respQueue.add(new SequenceMessage(asyncContext, msg, seq));
        // Check whether the number in the queue exceeds 256
        if(! ctx.hasTooManyPendingResponses()) {while(! respQueue.isEmpty()) {final SequenceMessage queuedPipelinedResponse = respQueue.peek();
                // If the sequence does not match, no response is sent
                if(queuedPipelinedResponse.sequence ! = getNextRequiredSequence(groupId, peerId, connection)) {// sequence mismatch, waiting for next response.
                    break;
                }
                respQueue.remove();
                try {
                    // Send the response
                    queuedPipelinedResponse.sendResponse();
                } finally {
                    // add one to the sequencegetAndIncrementNextRequiredSequence(groupId, peerId, connection); }}}else {
            LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}",
                ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses);
            connection.close();
            // Close the connection if there are too many pending responses in queue.removePeerRequestContext(groupId, peerId); }}}Copy the code

This method will push the sent data into the PriorityQueue for sorting, and then get the element with the smallest sequence number and compare it with the nextRequiredSequence. If it is not equal, then it is out of order and does not send the request

The Leader handles the Response of the log replication

After receiving the Response from the Follower, the Leader calls the Replicator’s onRpcReturned method

static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
                          final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
    if (id == null) {
        return;
    }
    final long startTimeMs = Utils.nowMs();
    Replicator r;
    if ((r = (Replicator) id.lock()) == null) {
        return;
    }
    // Check the version number, since every resetInflights increases version by one, so check
    if(stateVersion ! = r.version) { LOG.debug("Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.",
            r, stateVersion, r.version, request, response, status);
        id.unlock();
        return;
    }
    // Use the priority queue to sort by seq, the smallest will be the first
    final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
    // A priority queue is used here because the response is asynchronous and a small SEQ may respond more slowly than a large SEQ
    holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
    // The default holdingQueue contains a maximum of 256 entries
    if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
        LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
            holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
        // Resend the probe
        // Clear the data
        r.resetInflights();
        r.state = State.Probe;
        r.sendEmptyEntries(false);
        return;
    }

    boolean continueSendEntries = false;

    final boolean isLogDebugEnabled = LOG.isDebugEnabled();
    StringBuilder sb = null;
    if (isLogDebugEnabled) {
        sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,");
    }
    try {
        int processed = 0;
        while(! holdingQueue.isEmpty()) {// Get the smallest seq in the holdingQueue
            final RpcResponse queuedPipelinedResponse = holdingQueue.peek();

            // if the followers do not respond, the order is out of order and the Follower does not move down
            //sequence mismatch, waiting for next response.
            if(queuedPipelinedResponse.seq ! = r.requiredNextSeq) {// If there was processing before, break the loop directly here
                if (processed > 0) {
                    if (isLogDebugEnabled) {
                        sb.append("has processed ").append(processed).append(" responses,");
                    }
                    break;
                } else {
                    //Do not processed any responses, UNLOCK id and return.
                    continueSendEntries = false;
                    id.unlock();
                    return; }}// Remove the smallest seQ from the priority queue
            holdingQueue.remove();
            processed++;
            // Gets the first element in the inflights queue
            final Inflight inflight = r.pollInflight();
            // When a request is made, inflight is queued
            // If it is empty, ignore it
            if (inflight == null) {
                // The previous in-flight requests were cleared.
                if (isLogDebugEnabled) {
                    sb.append("ignore response because request not found:").append(queuedPipelinedResponse)
                        .append(",\n");
                }
                continue;
            }
            // If seq is not aligned, the sequence is out of order
            if(inflight.seq ! = queuedPipelinedResponse.seq) {// reset state
                LOG.warn(
                    "Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
                    r, inflight.seq, queuedPipelinedResponse.seq);
                r.resetInflights();
                r.state = State.Probe;
                continueSendEntries = false;
                // Lock the node and wait for some time according to the error category
                r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
                return;
            }
            try {
                switch (queuedPipelinedResponse.requestType) {
                    case AppendEntries:
                        // Process response for log replication
                        continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
                            (AppendEntriesRequest) queuedPipelinedResponse.request,
                            (AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
                        break;
                    case Snapshot:
                        // Process the snapshot's response
                        continueSendEntries = onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status,
                            (InstallSnapshotRequest) queuedPipelinedResponse.request,
                            (InstallSnapshotResponse) queuedPipelinedResponse.response);
                        break; }}finally {
                if (continueSendEntries) {
                    // Success, increase the response sequence.
                    r.getAndIncrementRequiredNextSeq();
                } else {
                    // The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out.
                    break; }}}}finally {
        if (isLogDebugEnabled) {
            sb.append(", after processed, continue to send entries: ").append(continueSendEntries);
            LOG.debug(sb.toString());
        }
        if (continueSendEntries) {
            // unlock in sendEntries.r.sendEntries(); }}}Copy the code
  1. Check the version number, as each resetInflights increments version by one, so check that it is the same batch of data
  2. The pendingResponses queue of the Replicator is obtained, and the data of the current response is added to the queue as an RpcResponse instance
  3. Check whether the number of elements in the queue is greater than 256. If the number is greater than 256, the data is cleared and resynchronized
  4. Verify that the holdingQueue is the same as the current requiredNextSeq. If not, break the holdingQueue to exit the loop
  5. Gets the first element in the inflights queue. If seQ is not aligned, the order is out of order, and the state is reset
  6. Call the onAppendEntriesReturned method to handle the response of log replication
  7. If this succeeds, sendEntries will be called to continue sending the replication log to the followers

Replicator#onAppendEntriesReturned

private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status,
                                               final AppendEntriesRequest request,
                                               final AppendEntriesResponse response, final long rpcSendTime,
                                               final long startTimeMs, final Replicator r) {
    // Check whether the data sequence is correct
    if(inflight.startIndex ! = request.getPrevLogIndex() +1) {
        LOG.warn(
            "Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
            r, inflight.startIndex, request.getPrevLogIndex());
        r.resetInflights();
        r.state = State.Probe;
        // unlock id in sendEmptyEntries
        r.sendEmptyEntries(false);
        return false;
    }
    / / metric
    // record metrics
    if (request.getEntriesCount() > 0) {
        r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
        r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount());
        r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() ! =null ? request.getData().size()
            : 0);
    }

    final boolean isLogDebugEnabled = LOG.isDebugEnabled();
    StringBuilder sb = null;
    if (isLogDebugEnabled) {
        sb = new StringBuilder("Node "). //
            append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
            append(" received AppendEntriesResponse from "). //
            append(r.options.getPeerId()). //
            append(" prevLogIndex=").append(request.getPrevLogIndex()). //
            append(" prevLogTerm=").append(request.getPrevLogTerm()). //
            append(" count=").append(request.getEntriesCount());
    }
    // If the follower does not receive a successful response due to a crash, RPC call failure, etc
    // then block for a while before calling
    if(! status.isOk()) {// If the follower crashes, any RPC to the follower fails immediately,
        // so we need to block the follower for a while instead of looping until
        // it comes back or be removed
        // dummy_id is unlock in block
        if (isLogDebugEnabled) {
            sb.append(" fail, sleep.");
            LOG.debug(sb.toString());
        }
        // If a Replicator status listener is registered, notify all listeners
        notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
        if (++r.consecutiveErrorTimes % 10= =0) {
            LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
                r.consecutiveErrorTimes, status);
        }
        r.resetInflights();
        r.state = State.Probe;
        // unlock in in block
        r.block(startTimeMs, status.getCode());
        return false;
    }
    r.consecutiveErrorTimes = 0;
    // The response failed
    if(! response.getSuccess()) {// The Leader switch indicates that there may have been a network partition, and the new Leader will follow the new Leader
        if (response.getTerm() > r.options.getTerm()) {
            if (isLogDebugEnabled) {
                sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ")
                    .append(r.options.getTerm());
                LOG.debug(sb.toString());
            }
            // Get the representation of the current node -- NodeImpl
            final NodeImpl node = r.options.getNode();
            r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
            r.destroy();
            // Adjust your term term value
            node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
                "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
            return false;
        }
        if (isLogDebugEnabled) {
            sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.getLastLogIndex())
                .append(" local nextIndex ").append(r.nextIndex);
            LOG.debug(sb.toString());
        }
        if (rpcSendTime > r.lastRpcSendTimestamp) {
            r.lastRpcSendTimestamp = rpcSendTime;
        }
        // Fail, reset the state to try again from nextIndex.
        r.resetInflights();
        // If the latest index of the followers is less than the index to be sent next time, set it to the index of the Follower response
        // prev_log_index and prev_log_term doesn't match
        if (response.getLastLogIndex() + 1 < r.nextIndex) {
            LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.getLastLogIndex());
            // The peer contains less logs than leader
            r.nextIndex = response.getLastLogIndex() + 1;
        } else {
            // The peer contains logs from old term which should be truncated,
            // decrease _last_log_at_peer by one to test the right index to keep
            if (r.nextIndex > 1) {
                LOG.debug("logIndex={} dismatch", r.nextIndex);
                r.nextIndex--;
            } else {
                LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen", r.options.getPeerId()); }}// If the response fails, the Follower logs need to be obtained again for re-synchronization
        // dummy_id is unlock in _send_heartbeat
        r.sendEmptyEntries(false);
        return false;
    }
    if (isLogDebugEnabled) {
        sb.append(", success");
        LOG.debug(sb.toString());
    }
    // success
    // The response succeeded in checking the tenure
    if(response.getTerm() ! = r.options.getTerm()) { r.resetInflights(); r.state = State.Probe; LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
        id.unlock();
        return false;
    }
    if (rpcSendTime > r.lastRpcSendTimestamp) {
        r.lastRpcSendTimestamp = rpcSendTime;
    }
    // The number of logs submitted this time
    final int entriesSize = request.getEntriesCount();
    if (entriesSize > 0) {
        // The node confirms the commit
        r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId()); }}else {
        // The request is probe request, change the state into Replicate.
        r.state = State.Replicate;
    }
    r.nextIndex += entriesSize;
    r.hasSucceeded = true;
    r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
    // dummy_id is unlock in _send_entries
    if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
        r.sendTimeoutNow(false.false);
    }
    return true;
}
Copy the code

The onAppendEntriesReturned method is also very long, but we need to be patient and look down

  1. Check the data sequence for errors
  2. Perform measurement and concatenation log operations
  3. Determine if the returned status is not normal, notify the listener, reset and block for a while before sending
  4. If Success status is false, then verify the tenure. Because the Leader switch, it indicates that there may have been a network partition and you need to follow the new Leader again. If there is no problem with the duration then reset and reset the nextIndex based on the latest index returned by the followers
  5. If there are no problems with the various validations, log commit confirmation is performed and the latest log commit location index is updated