preface

When Nacos uses an embedded data source (-dembeddedStorage =true, one data source per node) and starts in a cluster (-dnacos.standalone =false), raft protocol is used to ensure data consistency.

In order to understand the relevant source code of Nacos, first take a look at SOFA – Jraft for Raft algorithm implementation. (Raft algorithm implemented by NACOS was used before 1.4.1, SOFA – JRAFT framework was used since 1.4.1)

A, a Closure

Status indicates that a piece of logic runs successfully or fails. State is empty or state. code is 0, indicating success.

public class Status implements Copiable<Status> {
    private static class State {
        int    code;
        String msg;
    }
    private State state;
    public Status(a) {
        this.state = null;
    }
    public static Status OK(a) {
        return new Status();
    }
    public boolean isOk(a) {
        return this.state == null || this.state.code == 0; }}Copy the code

Closure is a callback function, the run method is a callback method that executes after subsequent program execution is complete, and the Status parameter tells subsequent execution success or failure.

/** * Callback closure. */
public interface Closure {

    /**
     * Called when task is done.
     *
     * @param status the task status.
     */
    void run(final Status status);
}
Copy the code

In JRaft, closures are wrapped repeatedly and there are many callback layers. When there are multiple layers of Closure passing through, you can think of a Closure as a stack that is pushed once for each Closure created. Finally, when we call the outermost Closure. Run, we push all closures off the stack and call their run method.

For example, Closure3 above holds Closure2 above holds Closure1, which is responsible for responding to the client, and the other two layers contain some other business logic that eventually calls Closure1. When the external execution of the business is successful, call the run method with Closure3, which calls 3->2->1.

Second, use of JRaft

Here we use CounterServer in the Raft-Example provided by SOFA – Jraft to see how to implement a distributed counter using Jraft.

1. Create RaftServer

public class CounterServer {
    private RaftGroupService    raftGroupService;
    private Node                node; // The current node
    private CounterStateMachine fsm;
    public CounterServer(final String dataPath, final String groupId, final String serverIdStr,
                         final String initConfStr) throws IOException {
        // Parse the parameters
        // serverIdStr = Current node IP address :port
        PeerId serverId = new PeerId();
        if(! serverId.parse(serverIdStr)) {throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
        }
        / / initConfStr = = node1 cluster node list: port, 2: port, node3: port
        Configuration initConf = new Configuration();
        if(! initConf.parse(initConfStr)) {throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
        }
        // Initialize the path
        FileUtils.forceMkdir(new File(dataPath));
        // Here we have raft RPC and business RPC using the same RPC server, which can usually be separated
        final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint());
        // Register a business handler
        CounterService counterService = new CounterServiceImpl(this);
        rpcServer.registerProcessor(new GetValueRequestProcessor(counterService));
        rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService));
        // Initialize the state machine
        this.fsm = new CounterStateMachine();
        NodeOptions nodeOptions = new NodeOptions();
        // Set the state machine to boot parameters
        nodeOptions.setFsm(this.fsm);
        // Set the storage path
        // Log, mandatory
        nodeOptions.setLogUri(dataPath + File.separator + "log");
        // Meta information is required
        nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
        // snapshot, optional, generally recommended
        nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
        // Set the election timeout to 1 second
        nodeOptions.setElectionTimeoutMs(1000);
        // Close the CLI service.
        nodeOptions.setDisableCli(false);
        // Take snapshots every 30 seconds
        nodeOptions.setSnapshotIntervalSecs(30);
        // Set the initial cluster configuration
        nodeOptions.setInitialConf(initConf);
        // Initialize the RAFT Group service framework
        this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
        / / start
        this.node = this.raftGroupService.start(); }}Copy the code

There are many startup configurations, which are mainly divided into the following categories:

  • RpcServer: This factory is loaded according to Sofa’s own SPI mechanism. The normal implementation is BoltRaftRpcFactory. The Rpc service provided by Sofa – Bolt is used internally.
    • Raft communication and business communication using the same RpcServer such as the current CountServer.
    • Raft communication uses the RpcServer provided by JRaft, and Business Server uses SpringBoot to develop HttpServer such as Nacos.
  • RpcProcessor (GetValueRequestProcessor) : Rpc request processor. It is required only when the sofa- Jraft RpcServer is used for business communication. Can realize the com. Alipay. Sofa. Jraft. RPC. RpcProcessor interface, according to the target request parameters, request processing business.
  • StateMachine (CounterStateMachine) : StateMachine interface that must be implemented when sofa-jraft is used to store and read service data. A very important interface.
  • NodeOptions: current configuration options of the RAFT node.
  • RaftGroupService: Holds Node and RpcServer and is responsible for starting and shutting down Raft services.
  • Node: The implementation class is NodeImpl, which is responsible for submitting motions (apply) and linear consistent reading (readIndex).

2. State machines

Using SOFA – Jraft requires implementing your own StateMachine.

The most important method is the onApply method. This method is triggered to apply state (the current Node stores data) after the Node commits the Task and the corresponding log is committed to the Raft cluster and the quorum Node successfully commits the log. CounterStateMachine performs atomic counter functions, including GET and addAndGet, in the onApply method.

public class CounterStateMachine extends StateMachineAdapter {
    // Atomic counter
    private final AtomicLong    value      = new AtomicLong(0);
    @Override
    public void onApply(final Iterator iter) {
        while (iter.hasNext()) {
            long current = 0;
            CounterOperation counterOperation = null;
            CounterClosure closure = null;
            // iter.done() ! = null indicates that the current node is the leader
            Closure can be used to retrieve the request data directly, without deserializing the request message
            if(iter.done() ! =null) {
                closure = (CounterClosure) iter.done();
                counterOperation = closure.getCounterOperation();
            } else {
                Done == null Indicates that the current node is follower and the request message needs to be deserialized
                final ByteBuffer data = iter.getData();
                counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
                        data.array(), CounterOperation.class.getName());
            }
            // Execute business logic
            switch (counterOperation.getOp()) {
                    case GET:
                        // get
                        current = this.value.get();
                        break;
                    case INCREMENT:
                        // addAndGet
                        final long delta = counterOperation.getDelta();
                        final long prev = this.value.get();
                        current = this.value.addAndGet(delta);
                        break;
                }
            // If closure is not empty, the current node is the leader and needs to call closure. Run to notify the client
            if(closure ! =null) {
              closure.success(current);// Set the response parameters
              closure.run(Status.OK()); // Execute the Closure callback
            }
            // Execute the next taskiter.next(); }}Copy the code

Sofa – Jraft provides the data Snapshot function. Snapshot is a Snapshot, which is a record of the current data value. It is saved in a disk to provide cold backup data. Creating a snapshot has several functions:

  • When a new Node joins the cluster, it does not only rely on log replication and playback to keep the data consistent with the Leader, but installs the Leader’s snapshot to skip the playback of a large number of early logs.
  • The Leader can reduce the amount of data on the network by replacing Log replication with snapshot.
  • Replacing earlier logs with snapshots saves storage space;

StateMachine provides two methods, onSnapshotLoad for loading the snapshot and onSnapshotSave for saving the snapshot. The trigger time is selected by sofa- Jraft framework.

// CounterStateMachine writes the count in memory to disk
@Override
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
  final long currVal = this.value.get();
  Utils.runInThread(() -> {
    final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
    if (snapshot.save(currVal)) {
      if (writer.addFile("data")) {
        done.run(Status.OK());
      } else {
        done.run(new Status(RaftError.EIO, "Fail to add file to writer")); }}else {
      done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); }}); }// CounterStateMachine loads the snapshot to the memory counter
@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
  if (isLeader()) {
    return false;
  }
  if (reader.getFileMeta("data") = =null) {
    return false;
  }
  final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
  try {
    this.value.set(snapshot.load());
    return true;
  } catch (final IOException e) {
    return false; }}Copy the code

Three, write,

Raft consensus Algorithm To ensure strong consistency, all read and write requests must be submitted to the Leader node for execution.

1. User code timing

Again, CounterServer is used as an example.

  • When the client initiates the IncreamentAndGetRequest request, the Leader node encapsulates the request as a Task submitted to the Raft cluster.
  • The Raft framework performs log replication.
  • After most of the nodes in the Raft cluster are committed (log replication), the onApply method of the StateMachine of all the nodes is called to apply the state and update the in-memory counters.
  • The onApply method that acts as the Leader node also needs to respond to the client request by returning ValueResponse.

Let’s look at the business process in detail, ignoring what the JRaft framework does.

IncrementAndGetRequest request parameter that initiates a write request to the RAFT cluster and performs the addAndGet operation on the atomic counter.

public class IncrementAndGetRequest {
    // Increase the step size
    private long              delta;
}
Copy the code

ValueResponse Response parameters.

public class ValueResponse implements Serializable {
		// The value of the atomic counter
    private long              value;
    // Check whether it succeeds
    private boolean           success;
    // Redirect the leader's IP address :port
    private String            redirect;
    // Error message
    private String            errorMsg;
}
Copy the code

Initialization IncrementAndGetRequestProcessor RpcServer, the incoming request processor, IncrementAndGetRequest to process business request. Here use is to provide the RpcServer JRaft request processing business, when I received IncrementAndGetRequest type request parameters, JRaft IncrementAndGetRequestProcessor RpcServer can automatically identify and call.

If the HttpServer is developed by SpringBoot, the RpcContext is equivalent to HttpServletResponse, where Closure’s Run method responds to the client via HttpServletResponse.

public class IncrementAndGetRequestProcessor implements RpcProcessor<IncrementAndGetRequest> {

    private final CounterService counterService;

    public IncrementAndGetRequestProcessor(CounterService counterService) {
        super(a);this.counterService = counterService;
    }

    // Process the request
    @Override
    public void handleRequest(final RpcContext rpcCtx, final IncrementAndGetRequest request) {
        // Closure, which responds to the client via RpcContext when the following steps are complete
        final CounterClosure closure = new CounterClosure() {
            @Override
            public void run(Status status) {
                // Call the getValueResponse method of the CounterClosure abstract class to get the response stuffed into the following incrementAndGet executionrpcCtx.sendResponse(getValueResponse()); }};this.counterService.incrementAndGet(request.getDelta(), closure);
    }

    // Returns the type of request concerned
    @Override
    public String interest(a) {
        returnIncrementAndGetRequest.class.getName(); }}Copy the code

CounterServiceImpl request into into CounterOperation, encapsulated in the com. Alipay. Sofa. Jraft. Entity. The Task submitted to Raft cluster, note that there will be the initial response to the client’s Closure has passed, Closure’s run method callback will be executed at some point in the future.

public class CounterServiceImpl implements CounterService {
    private final CounterServer counterServer;
    @Override
    public void incrementAndGet(final long delta, final CounterClosure closure) {
        // Convert the business request delta to CounterOperation
        applyOperation(CounterOperation.createIncrement(delta), closure);
    }

    private void applyOperation(final CounterOperation op, final CounterClosure closure) {
        if(! isLeader()) {// When a response request fails, the application should forward the request to the Leader node and then respond with closure success
            handlerNotLeaderError(closure);
            return;
        }

        try {
            // Add the request to the closure
            closure.setCounterOperation(op);
            / / create a Task
            final Task task = new Task();
            // Serialize the request entry to ByteBuffer
            task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
            // Place the external closure in the done member variable of the Task
            task.setDone(closure);
            // Submit the Task to the current Node for processing, hosted by the JRaft framework
            this.counterServer.getNode().apply(task);
        } catch (CodecException e) {
            // If a codec exception occurs, it responds directly to the client
            closure.failure(errorMsg, StringUtils.EMPTY);
            closure.run(newStatus(RaftError.EINTERNAL, errorMsg)); }}}Copy the code

Task is an important entity class in JRaft.

  • ByteBuffer Data: indicates service data.
  • Closure Done: Callback method that is called when raft completes execution (commit or fail).
  • Long expecteTerm: Expect term, default -1. If it is not -1 and the current term is different from the expected term, the task is rejected.
public class Task {
    /** Associated task data*/
    private ByteBuffer        data             = LogEntry.EMPTY_DATA;
    /** task closure, called when the data is successfully committed to the raft group or failures happen.*/
    private Closure           done;
    /** Reject this task if expectedTerm doesn't match the current term of this Node if the value is not -1, default is -1.*/
    private long              expectedTerm     = -1;
}
Copy the code

After executing Node.apply(Task), the subsequent steps are hosted by the JRaft framework. The Leader synchronizes logs to the follower node. If the logs are successfully synchronized to the N /2+1 nodes, the onApply method of StateMachine is triggered to continue using the user’s service logic.

Going back to the onApply method of CounterStateMachine, the Leader node increments the in-memory counter delta and puts current into CounterClosure, and executes Closure’s Run method in response to the client.

@Override
public void onApply(final Iterator iter) {
    while (iter.hasNext()) {
        long current = 0;
        CounterOperation counterOperation = null;
        CounterClosure closure = null;
        // iter.done() ! = null indicates that the current node is the leader
        Closure can be used to retrieve the request data directly, without deserializing the request message
        if(iter.done() ! =null) {
            closure = (CounterClosure) iter.done();
            counterOperation = closure.getCounterOperation();
        } else {
            / /... The Follwer node needs to deserialize the request message and iter does not contain the closure callback method
        }
        // Execute business logic
        switch (counterOperation.getOp()) {
          case INCREMENT:
            final long delta = counterOperation.getDelta();
            final long prev = this.value.get();
            current = this.value.addAndGet(delta);
            break;
        }
        // If closure is not empty, the current node is the leader and needs to call closure. Run to notify the client
        if(closure ! =null) {
          closure.success(current);// Set the response parameters
          closure.run(Status.OK());// Execute the Closure callback} iter.next(); }}Copy the code

In addition, the onApply method will also be triggered on other follower nodes after the task’s log is committed, so the other nodes will also update their memory counters.

2. Look at write operations from a framework perspective

From a framework perspective, what does **Node.apply(Task)** do?

public interface Node extends Lifecycle<NodeOptions>, Describer {
    /**
     * [Thread-safe and wait-free]
     *
     * Apply task to the replicated-state-machine
     *
     * About the ownership:
     * |task.data|: for the performance consideration, we will take away the
     *               content. If you want keep the content, copy it before call
     *               this function
     * |task.done|: If the data is successfully committed to the raft group. We
     *              will pass the ownership to #{@link StateMachine#onApply(Iterator)}.
     *              Otherwise we will specify the error and call it.
     *
     * @param task task to apply
     */
    void apply(final Task task);
}
Copy the code

From a Java doc perspective, bytebuffers in tasks are removed by the framework for performance reasons. If the data is successfully committed to Raft cluster, it will be handled by the onApply method of StateMachine. The Closure in the task will be called by the user in the onApply method.

Again, take a look at the Javadoc for StateMachine’s onApply method.

public interface StateMachine {

    /** * Update the StateMachine with a batch a tasks that can be accessed * through |iterator|. * * Invoked when one or more tasks that were passed to Node#apply(Task) have been * committed to the raft group (quorum of the group peers have received * those tasks and stored them on the backing storage). * * Once this function returns to the caller, we will regard all the iterated * tasks through |iter| have been successfully applied. And if you didn't * apply all the  the given tasks, we would regard this as a critical error * and report a error whose type is ERROR_TYPE_STATE_MACHINE. * *@param iter iterator of states
     */
    void onApply(final Iterator iter);
}
Copy the code

This method is called when the Task submitted by Node#apply(Task) has been committed (most members have received the Task and saved it to back-end storage).

What does JRaft do in the process from Apply (Task) to onApply?

This process is quite long and can be divided into 7 stages. The overall process is as follows:

Phase 1: wrap LogEntryAndClosure

In this case, CounterServer is used as an example. When the subsequent steps are completed (after onApply is completed), the user’s CounterClosure will be called back. For convenience, the user’s CounterClosure will be remembered as done0.

NodeImpl’s Apply method encapsulates the user’s incoming Task as LogEntryAndClosure and puts it into a Disruptor framework queue, where the user code ends and CounterServiceImpl processing completes.

// NodeImpl
@Override
public void apply(final Task task) {
    // ...
    final LogEntry entry = new LogEntry();
    entry.setData(task.getData());
    int retryTimes = 0;
    try {
        // wrap LogEntryAndClosure with Closure done (client Closure callback), LogEntry entry (client data), and expectedTerm (client expectedTerm term term, default -1)
        final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
            event.reset();
            event.done = task.getDone();
            event.entry = entry;
            event.expectedTerm = task.getExpectedTerm();
        };
        while (true) {
            // Submit LogEntryAndClosure to the Disruptor framework queue
            if (this.applyQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > MAX_APPLY_RETRY_TIMES) {
                    return; } ThreadHelper.onSpinWait(); }}}catch (final Exception e) {
       // ...}}Copy the code

Phase two: Log writes to memory and puts Done0 into the ballot box

NodeImpl. LogEntryAndClosureHandler LogEntryAndClosure event processing.

private class LogEntryAndClosureHandler implements EventHandler<LogEntryAndClosure> {
    // task list for batch
    private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());

    @Override
    public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
                                                                                                      throws Exception {
       // ...
        this.tasks.add(event);
        if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
            executeApplyingTasks(this.tasks);
            this.tasks.clear(); }}}Copy the code

executeApplyingTasks

  • Verify whether the current node is the Leader
  • Validates the expected tenure of the Task
  • Put the log copy information into the BallotBox BallotBox. Note that done0 is put in there. The BallotBox will have a List of user closures that need to be called back, which will be put into Iterator of user StateMachine#onApply. Let the user perform the callback
  • Finally, the log is handed over to the LogManager to wrap the second Closure, LeaderStableClosure
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
    this.writeLock.lock();
    try {
        final int size = tasks.size();
        // 1. If the current node is not the Leader, failure is displayed
        if (this.state ! = State.STATE_LEADER) {final Status st = new Status();
            if (this.state ! = State.STATE_TRANSFERRING) { st.setError(RaftError.EPERM,"Is not leader.");
            } else {
                st.setError(RaftError.EBUSY, "Is transferring leadership.");
            }
            final List<LogEntryAndClosure> savedTasks = new ArrayList<>(tasks);
            Utils.runInThread(() -> {
                for (int i = 0; i < size; i++) { savedTasks.get(i).done.run(st); }});return;
        }
        final List<LogEntry> entries = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            final LogEntryAndClosure task = tasks.get(i);
            // 2. If the expected term is not equal to the current leader term, return failure
            if(task.expectedTerm ! = -1&& task.expectedTerm ! =this.currTerm) {
                    task.expectedTerm, this.currTerm);
                if(task.done ! =null) {
                    final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
                        task.expectedTerm, this.currTerm);
                    Utils.runClosureInThread(task.done, st);
                }
                continue;
            }
            // 3. The log information before replication is saved to the queue in the ballotBox
            if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
                this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
                Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
                continue;
            }
            task.entry.getId().setTerm(this.currTerm);
            task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
            entries.add(task.entry);
        }
        // 4. Pass LogEntry to LogManager, pass a LeaderStableClosure
        this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
        checkAndSetConfiguration(true);
    } finally {
        this.writeLock.unlock(); }}Copy the code

Enter LogManager#appendEntries. This logic is common to the leader node and follower node. The main idea is to write the log to memory first and then publish the StableClosureEvent event.

public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
    // ...
    boolean doUnlock = true;
    this.writeLock.lock();
    try {
        / /...
        // 1. Log Is written to the memory in batches
        if(! entries.isEmpty()) { done.setFirstLogIndex(entries.get(0).getId().getIndex());
            this.logsInMemory.addAll(entries);
        }
        done.setEntries(entries);
        int retryTimes = 0;
        // 2. Submit a StableClosureEvent
        final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
            event.reset();
            event.type = EventType.OTHER;
            event.done = done;
        };
        while (true) {
            if (tryOfferEvent(done, translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > APPEND_LOG_RETRY_TIMES) {
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        }
        doUnlock = false;
        if(! wakeupAllWaiter(this.writeLock)) { notifyLastLogIndexListeners(); }}finally {
        if (doUnlock) {
            this.writeLock.unlock(); }}}Copy the code

Stage 3: StableClosureEvent processing, drop the log in memory to disk, and leader commit log

private class StableClosureEventHandler implements EventHandler<StableClosureEvent> {
    LogId               lastId  = LogManagerImpl.this.diskId;
    List<StableClosure> storage = new ArrayList<>(256);
    AppendBatcher       ab      = new AppendBatcher(this.storage, 256.new ArrayList<>(),
                                    LogManagerImpl.this.diskId);

    @Override
    public void onEvent(final StableClosureEvent event, final long sequence, final boolean endOfBatch)
                                                                                                      throws Exception {
        final StableClosure done = event.done;
        if(done.getEntries() ! =null && !done.getEntries().isEmpty()) {
            this.ab.append(done);
        } else {
            // ...
        }
        if (endOfBatch) {
            // Write the log to disk and trigger the done callback
            this.lastId = this.ab.flush();
            setDiskId(this.lastId); }}}Copy the code

LogManagerImpl. AppendBatcher responsible to write the log into the underlying storage and trigger a Closure callback, trigger is LeaderStableClosure here.

private class AppendBatcher {
    List<StableClosure> storage;
    int                 cap;
    int                 size;
    int                 bufferSize;
    List<LogEntry>      toAppend;
    LogId               lastId;

    public AppendBatcher(final List<StableClosure> storage, final int cap, final List<LogEntry> toAppend,
                         final LogId lastId) {
        super(a);this.storage = storage;
        this.cap = cap;
        this.toAppend = toAppend;
        this.lastId = lastId;
    }

    LogId flush(a) {
        if (this.size > 0) {
            // 1. log writes to the underlying storage
            this.lastId = appendToStorage(this.toAppend);
            for (int i = 0; i < this.size; i++) {
                this.storage.get(i).getEntries().clear();
                Status st = null;
                try {
                    if (LogManagerImpl.this.hasError) {
                        st = new Status(RaftError.EIO, "Corrupted LogStorage");
                    } else {
                        st = Status.OK();
                    }
                    // 2. Trigger the callback
                    this.storage.get(i).run(st);
                } catch (Throwable t) {
                    LOG.error("Fail to run closure with status: {}.", st, t); }}this.toAppend.clear();
            this.storage.clear();

        }
        this.size = 0;
        this.bufferSize = 0;
        return this.lastId;
    }

    void append(final StableClosure done) {
        if (this.size == this.cap || this.bufferSize >= LogManagerImpl.this.raftOptions.getMaxAppendBufferSize()) {
            flush();
        }
        this.storage.add(done);
        this.size++;
        this.toAppend.addAll(done.getEntries());
        for (final LogEntry entry : done.getEntries()) {
            this.bufferSize += entry.getData() ! =null ? entry.getData().remaining() : 0; }}}Copy the code

The LeaderStableClosure calls the commitAt method of the BallotBox BallotBox to commit. The commitAt method will wait until the quorum node commits before processing the following steps, which we’ll see later.

class LeaderStableClosure extends LogManager.StableClosure {

    public LeaderStableClosure(final List<LogEntry> entries) {
        super(entries);
    }

    @Override
    public void run(final Status status) {
        if (status.isOk()) {
            NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1,
                NodeImpl.this.serverId);
        } else{ LOG.error(...) ; }}}Copy the code

Phase 4: Replicator log synchronization

For each Follower, the Leader node creates a Replicator instance to synchronize logs to the followers. The Replicator keeps sending AppendEntries requests to followers to synchronize the logs to Follwer.

// Replicator
private boolean sendEntries(final long nextSendingIndex) {
    final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
    // Set AppendEntriesRequest parameters
    if(! fillCommonFields(rb, nextSendingIndex -1.false)) {
        installSnapshot();
        return false;
    }
    ByteBufferCollector dataBuf = null;
    final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();
    final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
    try {
        for (int i = 0; i < maxEntriesSize; i++) {
            // Set the log metadata tenure, type, and packet length
            final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
            if(! prepareEntry(nextSendingIndex, i, emb, byteBufList)) {break;
            }
            rb.addEntries(emb.build());
        }
        // If no logs need to be synchronized, wait
        if (rb.getEntriesCount() == 0) {
            if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
                installSnapshot();
                return false;
            }
            // _id is unlock in _wait_more
            waitMoreEntries(nextSendingIndex);
            return false;
        }
        if (byteBufList.getCapacity() > 0) {
            dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
            for (final ByteBuffer b : byteBufList) {
                dataBuf.put(b);
            }
            finalByteBuffer buf = dataBuf.getBuffer(); buf.flip(); rb.setData(ZeroByteStringHelper.wrap(buf)); }}finally {
        RecycleUtil.recycle(byteBufList);
    }

    final AppendEntriesRequest request = rb.build();
    this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
    this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
    this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();

    final Recyclable recyclable = dataBuf;
    final int v = this.version;
    final long monotonicSendTimeMs = Utils.monotonicMs();
    final int seq = getAndIncrementReqSeq();

    Future<Message> rpcFuture = null;
    try {
        // Send appendEntries to followers
        rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1.new RpcResponseClosureAdapter<AppendEntriesResponse>() {

                @Override
                public void run(final Status status) {
                    RecycleUtil.recycle(recyclable);
                    // Process the follower response
                    onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, v, monotonicSendTimeMs); }}); }catch (final Throwable t) {
        RecycleUtil.recycle(recyclable);
        ThrowUtil.throwException(t);
    }
    addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
        seq, rpcFuture);

    return true;
}
Copy the code

In the onRpcReturn callback method, the Leader processes the result of the Follower replication.

static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
                          final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
    // ...
    final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
    holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
    boolean continueSendEntries = false;
    try {
        int processed = 0;
        while(! holdingQueue.isEmpty()) {final RpcResponse queuedPipelinedResponse = holdingQueue.peek();
             // ...
            holdingQueue.remove();
            try {
                switch (queuedPipelinedResponse.requestType) {
                    case AppendEntries:
                        // leader When the log is successfully replicated, the log is triggered
                        continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
                            (AppendEntriesRequest) queuedPipelinedResponse.request,
                            (AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
                        break;
                      // ...}}finally {
                if (continueSendEntries) {
                    r.getAndIncrementRequiredNextSeq();
                } else {
                    break; }}}}finally {
        // ...
        // Continue log synchronization
        if(continueSendEntries) { r.sendEntries(); }}}Copy the code

OnAppendEntriesReturned handles the response returned by followers and calls the commitAt method of the ballot box to continue voting.

private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status,
                                               final AppendEntriesRequest request,
                                               final AppendEntriesResponse response, final long rpcSendTime,
                                               final long startTimeMs, final Replicator r) {
    if(inflight.startIndex ! = request.getPrevLogIndex() +1) {
        // ...
        return false;
    }
    if(! status.isOk()) {// ...
        return false;
    }
    if(! response.getSuccess()) {// ...
        return false;
    }
    if(response.getTerm() ! = r.options.getTerm()) {// ...
        return false;
    }
    if (rpcSendTime > r.lastRpcSendTimestamp) {
        r.lastRpcSendTimestamp = rpcSendTime;
    }
    final int entriesSize = request.getEntriesCount();
    if (entriesSize > 0) {
        if (r.options.getReplicatorType().isFollower()) {
            // The leader continues to call the commitAt method of the ballot box
            r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId()); }}// ...
    return true;
}
Copy the code

Phase 5: Half of the nodes commit and submit the ApplyTask

Either the Leader submits or the Follower responds to the log copy request and the Leader processes the response result, the commitAt method of the BallotBox BallotBox is called, mainly for counting. When half nodes commit, the BallotBox BallotBox calls the onCommitted method of the FSMCaller.

// BallotBox
// Scenario 1: The leader's own log commit is completed
// Scenario 2: Replicator follower log commit completed
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
    final long stamp = this.stampedLock.writeLock();
    long lastCommittedIndex = 0;
    try {
        if (this.pendingIndex == 0) {
            return false;
        }
        if (lastLogIndex < this.pendingIndex) {
            return true;
        }
        if (lastLogIndex >= this.pendingIndex + this.pendingMetaQueue.size()) {
            throw new ArrayIndexOutOfBoundsException();
        }
        final long startAt = Math.max(this.pendingIndex, firstLogIndex);
        Ballot.PosHint hint = new Ballot.PosHint();
        for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
            final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
            hint = bl.grant(peer, hint);
            // When more than half of the nodes commit, the lastCommittedIndex is set to logIndex
            if(bl.isGranted()) { lastCommittedIndex = logIndex; }}// If there is no half-node commit, this will return directly
        if (lastCommittedIndex == 0) {
            return true;
        }
        this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
        this.pendingIndex = lastCommittedIndex + 1;
        this.lastCommittedIndex = lastCommittedIndex;
    } finally {
        this.stampedLock.unlockWrite(stamp);
    }
    // If half nodes commit, fsmcaller.oncommitted
    this.waiter.onCommitted(lastCommittedIndex);
    return true;
}
Copy the code

FSMCallerImpl#onCommitted Submits an ApplyTask event of type COMMITTED.

// FSMCallerImpl
public boolean onCommitted(final long committedIndex) {
    return enqueueTask((task, sequence) -> {
        task.type = TaskType.COMMITTED;
        task.committedIndex = committedIndex;
    });
}
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
    if (!this.taskQueue.tryPublishEvent(tpl)) {
      	return false;
    }
    return true;
}
Copy the code

Phase 6: Process ApplyTask and assemble IteratorImpl to call the onApply method of the user’s StateMachine

FSMCallerImpl. ApplyTaskHandler ApplyTask event processing.

private class ApplyTaskHandler implements EventHandler<ApplyTask> {
    private long maxCommittedIndex = -1;

    @Override
    public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
        this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch); }}Copy the code

The runApplyTask method, which calls the doCommitted method.

// FSMCallerImpl
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
    CountDownLatch shutdown = null;
    if (task.type == TaskType.COMMITTED) {
        if(task.committedIndex > maxCommittedIndex) { maxCommittedIndex = task.committedIndex; }}else {
        // ...
    }
    try {
        if (endOfBatch && maxCommittedIndex >= 0) {
            this.currTask = TaskType.COMMITTED;
            // call the onApply method of user StateMachine
            doCommitted(maxCommittedIndex);
            maxCommittedIndex = -1L;
        }
        this.currTask = TaskType.IDLE;
        return maxCommittedIndex;
    } finally {
        if(shutdown ! =null) { shutdown.countDown(); }}}Copy the code

The doCommitted Task puts the Closure Done stored in the IteratorImpl. The iterator calls the onApply method of user StateMachine, and the user code executes the done0 outermost callback.

// FSMCallerImpl
// Assemble IteratorImpl and call the onApply method of user StateMachine
private void doCommitted(final long committedIndex) {
    if (!this.error.getStatus().isOk()) {
      return;
    }
    final long lastAppliedIndex = this.lastAppliedIndex.get();
    if (lastAppliedIndex >= committedIndex) {
      return;
    }
    final long startMs = Utils.monotonicMs();
    try {
      final List<Closure> closures = new ArrayList<>();
      final List<TaskClosure> taskClosures = new ArrayList<>();
      // Done0 (done in the original Task of the user code) will be put into closures
      final long firstClosureIndex = this.closureQueue.popClosureUntil(committedIndex, closures, taskClosures);

      final IteratorImpl iterImpl = new IteratorImpl(this.fsm, this.logManager, closures, firstClosureIndex,
                                                     lastAppliedIndex, committedIndex, this.applyingIndex);
      while (iterImpl.isGood()) {
        final LogEntry logEntry = iterImpl.entry();
        // ...
        // call the onApply method of user StateMachine
        doApplyTasks(iterImpl);
      }

      // ...
    } finally {
      this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs); }}private StateMachine fsm;
private void doApplyTasks(final IteratorImpl iterImpl) {
    final IteratorWrapper iter = new IteratorWrapper(iterImpl);
    final long startApplyMs = Utils.monotonicMs();
    final long startIndex = iter.getIndex();
    try {
      // call the onApply method of user StateMachine
      this.fsm.onApply(iter);
    } finally {
      // ...
    }
    if (iter.hasNext()) {
      LOG.error("");
    }
    iter.next();
}
Copy the code

Phase 7: Follower application logs

The Leader sends a commitedIndex through the Replicator. After discovering that the applyIndex is smaller than the commitedIndex, the Follower submits the ApplyTask to execute the onApply method of the user’s StateMachine.

The client NodeImpl AppendEntriesRequest handleAppendEntriesRequest processing, will set committedIndex ballot boxes.

// NodeImpl
public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
   // ...
    if (entriesCount == 0) {
      // A heartbeat request or probe request
      this.ballotBox.setLastCommittedIndex(Math.min(request.getCommittedIndex(), prevLogIndex));
      return respBuilder.build();
    }
    / /...
}
Copy the code

If the ballot box finds that the committedIndex exceeds the previous committedIndex, the ApplyTask is committed, and the onApply method of the user’s StateMachine is called, just as the Leader did.

// BallotBox
private FSMCaller waiter;
public boolean setLastCommittedIndex(final long lastCommittedIndex) {
    boolean doUnlock = true;
    final long stamp = this.stampedLock.writeLock();
    try {
        // ...
        if (lastCommittedIndex < this.lastCommittedIndex) {
            return false;
        }
        if (lastCommittedIndex > this.lastCommittedIndex) {
            this.lastCommittedIndex = lastCommittedIndex;
            this.stampedLock.unlockWrite(stamp);
            doUnlock = false;
            // The ApplyTask is submitted and, like the Leader, the onApply method of the user's StateMachine is called
            this.waiter.onCommitted(lastCommittedIndex); }}finally {
        if (doUnlock) {
            this.stampedLock.unlockWrite(stamp); }}return true;
}
Copy the code

Four, reading

To achieve linear consistent reads, all read requests should be forwarded to the Leader node.

1. User code timing

GetValueRequestProcessor handles GetValueRequest. This wraps the first Closure, which is responsible for responding to the client.

public class GetValueRequestProcessor implements RpcProcessor<GetValueRequest> {

    private final CounterService counterService;

    public GetValueRequestProcessor(CounterService counterService) {
        super(a);this.counterService = counterService;
    }

    @Override
    public void handleRequest(final RpcContext rpcCtx, final GetValueRequest request) {
        final CounterClosure closure = new CounterClosure() {
            @Override
            public void run(Status status) {
                // Respond to the clientrpcCtx.sendResponse(getValueResponse()); }};this.counterService.get(request.isReadOnlySafe(), closure);
    }

    @Override
    public String interest(a) {
        returnGetValueRequest.class.getName(); }}Copy the code

CounterServiceImpl implements consistent reads through the nodeImp. readIndex method, which encapsulates the second Closure- ReadIndexClosure passed in to the readIndex method.

public class CounterServiceImpl implements CounterService {
    private final CounterServer counterServer;
    private final Executor      readIndexExecutor;
    @Override
    public void get(final boolean readOnlySafe, final CounterClosure closure) {
        ReadOnlySafe = false, which returns the statemachine value of the current node without consistent read logic
        if(! readOnlySafe){ closure.success(getValue()); closure.run(Status.OK());return;
        }

        // readOnlySafe = true, consistent read logic
        this.counterServer.getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
            @Override
            public void run(Status status, long index, byte[] reqCtx) {
                // Get the value of the state machine after readIndex(commitIndex) <= applyIndex
                if(status.isOk()){
                    closure.success(getValue());
                    closure.run(Status.OK());
                    return;
                }
                // Failed processing
                CounterServiceImpl.this.readIndexExecutor.execute(() -> {
                    // If the current node is the Leader, submit the task to the Raft cluster. If it succeeds, the onApply method response of the CounterStateMachine is called back
                    if(isLeader()){
                        applyOperation(CounterOperation.createGet(), closure);
                    }else {
                        // If the current node is not the Leader, the response failshandlerNotLeaderError(closure); }}); }}); }// Get value from CounterStateMachine
    private long getValue(a) {
        return this.counterServer.getFsm().getValue(); }}Copy the code

When the readIndex execution is complete, the user’s ReadIndexClosure callback is executed.

  • If status is successful, the value of readIndex is less than or equal to applyIndex, and the value of readIndex is read from the local state machine of the current node.
  • If it fails and the current node is still the Leader, downgrade to Raft flow to ensure consistent reads (as in IncreamentAndGet).
  • If the Leader node fails and the current node is not the Leader node, failure is displayed.

2. View the read operation from a frame perspective

From NodeImpl#readIndex to ReadIndexClosure being called, what does JRaft do?

The NodeImpl is assigned to ReadOnlyServiceImpl.

// NodeImpl
@Override
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
    if (this.shutdownLatch ! =null) {
        Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    this.readOnlyService.addRequest(requestContext, done);
}
Copy the code

ReadOnlyServiceImpl publishes ReadIndexEvent events, handled by the inner class ReadIndexEventHandler. The readIndex method returns, and the ReadIndexEvent event handler handles the rest.

// ReadOnlyServiceImpl
@Override
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
    try {
      EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
        event.done = closure;
        event.requestContext = new Bytes(reqCtx);
        event.startTime = Utils.monotonicMs();
      };
      int retryTimes = 0;
      while (true) {
        if (this.readIndexQueue.tryPublishEvent(translator)) {
          break;
        } else {
          retryTimes++;
          if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
            Utils.runClosureInThread(closure,
                                     new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
            return; } ThreadHelper.onSpinWait(); }}}catch (final Exception e) {
      Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down.")); }}private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
  private final List<ReadIndexEvent> events = new ArrayList<>(
    ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

  @Override
  public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
    throws Exception {
    this.events.add(newEvent);
    if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
      executeReadIndexEvents(this.events);
      this.events.clear(); }}}Copy the code

The executeReadIndexEvents method of ReadOnlyServiceImpl constructs the ReadIndexRequest, which Node processes. Note that this encapsulates the third layer of Closure- ReadIndexResponseClosure.

// ReadOnlyServiceImpl
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
    if (events.isEmpty()) {
        return;
    }
    // Construct the ReadIndex request
    final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
        .setGroupId(this.node.getGroupId()) //
        .setServerId(this.node.getServerId().toString());

    final List<ReadIndexState> states = new ArrayList<>(events.size());

    for (final ReadIndexEvent event : events) {
        rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
        states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
    }
    final ReadIndexRequest request = rb.build();
    // Call Node to handle ReadIndex
    this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
Copy the code

NodeImpl Performs different operations based on the current node status. If the current node is the Leader, go to the readLeader. If the current node is follower, go to readFollower. The logic on the two nodes is different.

// NodeImpl
@Override
public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            // ...}}finally {
        this.readLock.unlock(); }}Copy the code

The current node is the Leader

If the current node is the Leader, go to the readLeader method of the NodeImpl directly. The follower sends a ReadIndexRequest to the leader, who also uses this method.

// NodeImpl
private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                        final RpcResponseClosure<ReadIndexResponse> closure) {
    // If there is only one node, the response succeeds
    final int quorum = getQuorum();
    if (quorum <= 1) {
        respBuilder.setSuccess(true) //
            .setIndex(this.ballotBox.getLastCommittedIndex());
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        return;
    }
    // If the leader does not commit any logs during his tenure, the request is rejected
    final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
    if (this.logManager.getTerm(lastCommittedIndex) ! =this.currTerm) {
        closure
            .run(new Status(
                RaftError.EAGAIN,
                "ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.",
                lastCommittedIndex, this.currTerm));
        return;
    }
    respBuilder.setIndex(lastCommittedIndex);

    // If the ReadIndexRequest is sent by followers, the response fails if the followers are not in the raft cluster
    if(request.getPeerId() ! =null) {
        final PeerId peer = new PeerId();
        peer.parse(request.getServerId());
        if (!this.conf.contains(peer) && !this.conf.containsLearner(peer)) {
            closure
                .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: %s.", peer, this.conf));
            return;
        }
    }

    ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
    // If ReadOnlyLeaseBased but the leader is not valid, ReadOnlySafe is degraded to a normal readIndex request
    if(readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && ! isLeaderLeaseValid()) { readOnlyOpt = ReadOnlyOption.ReadOnlySafe; }switch (readOnlyOpt) {
        case ReadOnlySafe:
            final List<PeerId> peers = this.conf.getConf().getPeers(); Requires.requireTrue(peers ! =null && !peers.isEmpty(), "Empty peers");
            final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                respBuilder, quorum, peers.size());
            // Send heartbeats to other followers to confirm that they are still the leader
            for (final PeerId peer : peers) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
            }
            break;
        case ReadOnlyLeaseBased:
            respBuilder.setSuccess(true);
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            break; }}Copy the code

For consistent read, two options are available: ReadIndex and LeaseRead. By default, ReadIndex is used.

  • ReadIndex (ReadOnlySafe) : Sends heartbeat messages to other followers to ensure that the current node is still the leader.

  • LeaseRead (ReadOnlyLeaseBased) : to reduce the number of heartbeat RPC requests sent to the follower, each time the leader sends a heartbeat request to the follower, a timestamp is updated. If the heartbeat request is within the heartbeat timeout period, the current node is considered to be the leader.

private boolean checkLeaderLease(final long monotonicNowMs) {
    // Current time - Last heartbeat duration < Heartbeat timeout duration * 0.9 = 0.9s
    return monotonicNowMs - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs();
}
Copy the code

Heartbeat request response callback ReadIndexHeartbeatResponseClosure, this is the fourth callback, when leader after receiving the most followers respond to the heart, think oneself is still a leader, to perform the third callback run method.

private class ReadIndexHeartbeatResponseClosure extends RpcResponseClosureAdapter<AppendEntriesResponse> {

    @Override
    public synchronized void run(final Status status) {
        if (this.isDone) {
            return;
        }
        if (status.isOk() && getResponse().getSuccess()) {
            this.ackSuccess++;
        } else {
            this.ackFailures++;
        }
        // Include leader self vote yes.
        if (this.ackSuccess + 1> =this.quorum) {
            this.respBuilder.setSuccess(true);
            this.closure.setResponse(this.respBuilder.build());
            this.closure.run(Status.OK());
            this.isDone = true;
        } else if (this.ackFailures >= this.failPeersThreshold) {
            this.respBuilder.setSuccess(false);
            this.closure.setResponse(this.respBuilder.build());
            this.closure.run(Status.OK());
            this.isDone = true; }}}Copy the code

ReadIndexResponseClosure is the third callback. This is common logic and is called when ReadIndex is finished, regardless of whether the current node is the leader or follower. If readIndex reaches applyIndex, it can respond to the second callback; If readIndex does not reach applyIndex, a wait queue is placed and the second callback is not executed until the log is copied to readIndex. The second callback here is the ReadIndexClosure callback passed in by the user code to the NodeImpl#readIndex method. The user can then read the value from the state machine and return it.

class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexResponse> {
    final List<ReadIndexState> states;
    final ReadIndexRequest     request;

    /** * When a consistency read returns, the client node handles the callback status */
    @Override
    public void run(final Status status) {
        // 1. If the response fails, the read fails
        if(! status.isOk()) { notifyFail(status);return;
        }
        final ReadIndexResponse readIndexResponse = getResponse();
        if(! readIndexResponse.getSuccess()) { notifyFail(new Status(-1."Fail to run ReadIndex task, maybe the leader stepped down."));
            return;
        }
        2. Set ReadIndexStatus
        final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
            readIndexResponse.getIndex());
        for (final ReadIndexState state : this.states) {
            state.setIndex(readIndexResponse.getIndex());
        }

        boolean doUnlock = true;
        ReadOnlyServiceImpl.this.lock.lock();
        try {
            // 3. If applyIndex of the current node is greater than or equal to current Commit Index of ReadIndexResponse, execute the Closure callback to read the data in the current node's state machine
            if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
                ReadOnlyServiceImpl.this.lock.unlock();
                doUnlock = false;
                notifySuccess(readIndexStatus);
            } else {
                // 4. If applyIndex is smaller than current Commit Index, put the applyIndex into the queue and wait for applyIndex
                ReadOnlyServiceImpl.this.pendingNotifyStatus
                    .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //.add(readIndexStatus); }}finally {
            if (doUnlock) {
                ReadOnlyServiceImpl.this.lock.unlock(); }}}}Copy the code

The current node is Follower

The current node is follower and forwards readIndex requests to the Leader node.

// NodeImpl
private void readFollower(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> closure) {
    if (this.leaderId == null || this.leaderId.isEmpty()) {
        closure.run(new Status(RaftError.EPERM, "No leader at term %d.".this.currTerm));
        return;
    }
    // send request to leader.
    final ReadIndexRequest newRequest = ReadIndexRequest.newBuilder() //
        .mergeFrom(request) //
        .setPeerId(this.leaderId.toString()) //
        .build();
    this.rpcService.readIndex(this.leaderId.getEndpoint(), newRequest, -1, closure);
}
Copy the code

Leader ReadIndexRequest ReadIndexRequestProcessor processing, The handleReadIndexRequest method of NodeImpl(RaftServerService’s implementation class) is still called to process the ReadIndexRequest request, exactly as if the node were the Leader.

public class ReadIndexRequestProcessor extends NodeRequestProcessor<ReadIndexRequest> {

    @Override
    public Message processRequest0(final RaftServerService service, final ReadIndexRequest request,
                                   final RpcRequestClosure done) {
        service.handleReadIndexRequest(request, new RpcResponseClosureAdapter<RpcRequests.ReadIndexResponse>() {

            @Override
            public void run(final Status status) {
                if(getResponse() ! =null) {
                    // 1. If response is not null, the client responds successfully
                    done.sendResponse(getResponse());
                } else {
                    // 2. If response is null, return original statusdone.run(status); }}});return null; }}Copy the code

conclusion

  • To use SOFA – Jraft, you need the user to implement the StateMachine interface for saving and reading data. Create RaftGroupService and start Raft service using RaftGroupService start method to get NodeImpl.
public class CounterServer {
    private RaftGroupService    raftGroupService; / / raft service
    private Node                node; // The current node
    private CounterStateMachine fsm; / / state machine
}
Copy the code
  • Raft writing from a user code perspective. The user needs to create a Task, where data is the data to be written and done is the Closure of the callback function after the Task is completed. Finally, the Task is submitted to the JRaft framework via the Node#apply(Task) method.
final Task task = new Task();
// Serialize the request entry to ByteBuffer
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
// Place the external closure in the done member variable of the Task
task.setDone(closure);
// Submit the Task to the current Node for processing, hosted by the JRaft framework
this.counterServer.getNode().apply(task);
Copy the code

When JRaft framework finishes processing, it calls the onApply method of the user’s StateMachine and sends the logs that need to be applied to the user in batches through an Iterator. Each element in the Iterator holds a Closure that was passed in when the task was submitted. The Closure’s run method needs to be called back when the user has finished processing it.

  • View raft writing from the perspective of SOFA – Jraft framework. General process:

    • Task commit Leader-> Data write memory -> Memory drop disk -> COMMIT
    • Leader->Replicator log replication ->Follower-> data write to memory -> Memory drop -> Respond to Leader-> Commit
    • The Leader’s BallotBox BallotBox processes commit-> superquorum commit-> commit ApplyTask-> invoke the onApply method of user StateMachine
    • The Leader Replicator logs are replicated ->Follower->Follower ballot box finds committedIndex exceeds applyIndex->Follower submits ApplyTask-> calls the onApply party of user StateMachine method
  • Raft consistent reads from the user’s perspective. Users can request the leader node or follower node for consistent read. A consistent read is performed using NodeImpl’s readIndex method, passing in the ReadIndexClosure callback function. When the JRaft framework finishes processing, it calls back the Run method of ReadIndexClosure. If status is successful, readIndex is less than or equal to applyIndex. You can obtain the value from the user’s local state machine. If status fails, consider performing the normal RAFT process when the current node is the Leader node and downgrade to the same process as writing.

this.counterServer.getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
  @Override
  public void run(Status status, long index, byte[] reqCtx) {
    // Get the value of the state machine after readIndex(commitIndex) <= applyIndex
    if(status.isOk()){
      closure.success(getValue());
      closure.run(Status.OK());
      return;
    }
    // Failed processing
    CounterServiceImpl.this.readIndexExecutor.execute(() -> {
      // If the current node is the Leader, submit the task to the Raft cluster. If it succeeds, the onApply method response of the CounterStateMachine is called back
      if(isLeader()){
        applyOperation(CounterOperation.createGet(), closure);
      }else {
        // If the current node is not the Leader, the response failshandlerNotLeaderError(closure); }}); }});Copy the code
  • View raft consistency reading from a frame perspective. If the user requests followers, the request is forwarded to the leader. The leader supports ReadIndex and LeaseRead. The ReadIndex solution is used by default.

    • ReadIndex (ReadOnlySafe) : Sends heartbeat messages to other followers to ensure that the current node is still the leader.

    • LeaseRead (ReadOnlyLeaseBased) : to reduce the number of heartbeat RPC requests sent to the follower, each time the leader sends a heartbeat request to the follower, a timestamp is updated. If the heartbeat request is within the heartbeat timeout period, the current node is considered to be the leader.