Author: meow uncle: the original blog. Betacat. IO/post/raft – I…

Raft is a popular consistency algorithm in recent years. Its principle is easy to understand, the Internet also has a lot of related introduction, so here I am no longer wordy principle, but plans to raft in implementation of etcd [1], for example, from the perspective of engineering a concrete realization of the algorithm, understand the principle, after all, only a “theory”, from the truly put it up and have a long distance.

If you’re not familiar with Raft, this classic animation demo, its paper, and this lecture might be helpful. Or you can just watch the video below, which is a technical post I made about parsing the source code for raft module in ETCD. As an aside, many conferences and Meetups upload videos to YouTube, which is a programmer’s closet. Every time you visit it, you learn something new. www.youtube.com/watch?v=sL0…

Overview

Etcd implements the RAFT protocol as a library and then uses it as an application itself. Of course, perhaps to promote the library it implements, ETCD also provides an example program called RaftExample that shows how to build a distributed KV storage engine based on the Raft Library it provides.

In ETCD, raft runs in a Goroutine as the underlying consensus module, receives messages from the upper layer (ETCDServer) through a channel, and returns the results to the upper layer application through another channel. Their interaction looks something like this:

The good thing about this all-asynchronous interaction is that it improves performance, but the bad thing is that it’s hard to debug and the code can look messy. Etcd, for example, will often push a message into a slice/channel, and then the chain of function calls will end. You can’t intuitively track who processed the message last.

Code Breakdown

Let’s take a look at what files are in the Raft Library:

$ tree --dirsfirst -L 1 -I '*test*' -P '*.go'. ├ ─ ─ raftpb ├ ─ ─ doc. Go ├ ─ ─ the go ├ ─ ─ log_unstable. Go ├ ─ ─ logger. Go ├ ─ ─ node. Go ├ ─ ─ progress. Go ├ ─ ─ raft. Go ├ ─ ─ ├── Go, go, go, go, go, go, go, go, goCopy the code

The following are the functional modules:

raftpb

Serialization in Raft is implemented with the help of the Protocol Buffer. This folder defines several data structures that need to be serialized, starting with Entry and Message:

Entry

As a whole, each node in a cluster is a state machine, and RAFT manages the actions that make changes to this state machine, encapsulated in code as Entry after Entry.

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L203
type Entry struct {
    Term             uint64
    Index            uint64
    Type             EntryType
    Data             []byte
}
Copy the code
  • Term: election Term, incremented by 1 after each election. Its main function is to mark the timeliness of information. For example, when a node sends a message with term 2 and another node with term 3, we consider the information of the first node to be outdated.
  • Index: Index of the position of the current entry in the raft log. There are theTermandIndexAfter that, a log entry can be uniquely identified.
  • Type: indicates the Type of the current entry. Currently, etCD supports two types: EntryNormal indicates that the current Entry is an operation on the state machine. EntryConfChange indicates that the current cluster configuration is changed, such as adding or removing nodes.
  • Data: A serialized byte array that represents the actual operation to be performed on the current entry, such as ifTypeisEntryNormal, then the Data may be the specific key-value pair to change, ifTypeisEntryConfChangeData is the specific configuration change itemConfChange. Raft doesn’t care what the data is. It treats the data as a payload during log synchronization, and upper-layer applications parse the data.

Message

The communication between nodes in a Raft cluster is done by passing different messages. This Message structure is a very general container that contains the fields required for the various messages.

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L239
type Message struct {
    Type             MessageType
    To               uint64
    From             uint64
    Term             uint64
    LogTerm          uint64
    Index            uint64
    Entries          []Entry
    Commit           uint64
    Snapshot         Snapshot
    Reject           bool
    RejectHint       uint64
    Context          []byte
}
Copy the code
  • Type: There are many values of the message type currently passed, such as MsgVote for requesting votes, MsgPreVote[2] for handling network partitions, MsgPreVote for sending to the leader node, MsgProp(OSE) for adding data to logs, MsgApp(end) for copying logs, MsgSnap for installing snapshot. Different types of messages also use different fields below.
  • To and From represent the receiver and sender of the message, respectively.
  • Term: The Term the entire cluster is in when this message is sent.
  • LogTerm: The term number of the last entry in the log saved by the message sender, generallyMsgVoteYou’re going to use this field.
  • Index: indicates the log Index number. If the current message isMsgVoteRepresents the index number of the candidate’s last entry, which follows the previous entryLogTermTogether represent the latest log information owned by the candidate, so that others can compare whether their log is newer than candiData’s log and decide whether to vote or not.
  • Entries: Logs to be stored.
  • Commit: Indicates the index value of the committed log, which is used to synchronize the log submission information to others.
  • Snapshot: GeneralMsgSnapUsed to place a specific Snapshot value.
  • Reject, RejectHint: on behalf of the other nodes rejected the request of the current node (MsgVote/MsgApp/MsgSnap…).

log_unstable.go

As the name suggests, unstable data structure is for data that has not yet been persisted by the user layer. It maintains two parts: Snapshot and entries:

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log_unstable.go#L23
type unstable struct {
    // the incoming unstable snapshot, if any.
    snapshot *pb.Snapshot
    // all entries that have not yet been written to storage.
    entries []pb.Entry
    offset  uint64

    logger Logger
}
Copy the code

Entries represent logs on which operations are to be performed, but logs cannot grow indefinitely and in certain circumstances some expired logs may be emptied. This introduces a new problem. If a new follower joins later and the leader only has a portion of the operation logs, doesn’t that new follower become unsynchronized with others? So this is where snapshot comes in – I can’t give you the previous logs, but I will give you the results of all the previous logs applied, and you can apply the subsequent logs based on this snapshot, and then our state can be synchronized. Therefore, their structural relationship can be expressed as follows [3] :

The first half is the snapshot data and the second half is the array of log entries. The unstable. Offset member stores the index of the first piece of data in the raft log. That is, the ith entry index in the raft log is I + unstable. Offset.

storage.go

This file defines a Storage interface, and since raft implementation in ETCD is not responsible for data persistence, it wants the application layer above to implement this interface to give it the ability to query logs.

In addition, this file provides a MemoryStorage implementation of unstable Storage, which also maintains the Snapshot and entries sections to place Snapshot first and entries second. Etcdserver and RaftExample both use this implementation directly to provide log query functionality.

log.go

With unstable Storage and unstable Storage in mind, we are ready to implement Unstable raftLog. This structure is responsible for raft logging.

RaftLog consists of the following members:

  • Storage storage: the previously mentioned storage interface for storing persistent data.
  • Unstable: to store data that has not been persisted at the application layer.
  • Committed uint64: Saves the index of the currently committed log data.
  • Applied uint64: Stores the highest index of the current incoming state machine data.

It’s important to note that a log must be committed before it can be applied to a state machine. Therefore, the following inequality always holds: applied <= COMMITTED.

In raftLog structure, the arrangement of several parts of data is shown as follows [3:1] :

This configuration can be seen in the raftLog initialization function:

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log.go#L45
// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
    if storage == nil {
        log.Panic("storage must not be nil")
    }
    log := &raftLog{
        storage: storage,
        logger:  logger,
    }
    firstIndex, err := storage.FirstIndex()
    iferr ! =nil {
        panic(err) // TODO(bdarnell)
    }
    lastIndex, err := storage.LastIndex()
    iferr ! =nil {
        panic(err) // TODO(bdarnell)
    }
    log.unstable.offset = lastIndex + 1
    log.unstable.logger = logger
    // Initialize our committed and applied pointers to the time of the last compaction.
    log.committed = firstIndex - 1
    log.applied = firstIndex - 1

    return log
}
Copy the code

So, the dividing line between unstable and persistent raftLog is lastIndex, where unstable is stored.

There is also a question about why the unstable snapshot member is not initialized. The reason is that the above is the initialization function, that is, when the node has just started calling function to initialize the storage status, and unstable. The snapshot data, is in the process of start synchronization data, if need to synchronize the snapshot data to carry on assignment to modify data, therefore did not operate to it here.

progress.go

The Leader tracks the status of a follower using the Progress data structure and determines the log entries for each synchronization based on the Progress information. Here are three important attributes:

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/progress.go#L37
// Show the follower Progress in the view of the leader
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
    Match, Next uint64

    State ProgressStateType

    ins *inflights
}
Copy the code
  1. The following attributes are used to save the log status of the current follower node:

    • Match: saves the highest index value of the logs copied to the follower so far. This value is set to 0 if the leader is unaware of the logs on the follower.
    • Next: Saves the log index of the Next time the leader sends an Append message to the followerNextThe system starts to send logs.

    Under normal circumstances, Next = Match + 1, that is, the Next log to be synchronized should be the Next log of the other party.

  2. The State attribute is used to save the current synchronization State of the node, and it has the following values [4] :

    • ProgressStateProbe

    Probe state, which occurs when a follower rejects the most recent Append message and the leader attempts to trace back to where the follower’s log was lost. In the Probe state, the leader appends at most one log at a time. If the response contains RejectHint, the leader rolls back the Next index for the Next retry. At the initial stage, the leader sets the status of all followers to probe because it does not know the synchronization status of each follower, so it needs to be tested slowly.

    • ProgressStateReplicate

    When the leader confirms the synchronization status of a follower, it will change the state of the follower to this state and quickly copy the log through pipeline. After the leader sends the replication message, it changes the Next index of the node to the maximum index of the sent message +1.

    • ProgressStateSnapshot

    Receiving snapshot status. When the leader sends an Append message to a follower in an attempt to keep the follower status up to that of the leader, it is found that the index data saved by the leader is no longer consistent. For example, all the data saved by the leader before the index value is 10 has been written into the snapshot. However, the follower needs data older than 10. At this point, the follower will switch to this state and send a snapshot to the follower. When snapshot data synchronization catches up, it does not switch directly to the Replicate state, but to the Probe state first.

  3. The INS attribute is used for traffic control. If there are too many synchronization requests and a network partition is hit, the leader may accumulate too many messages to be sent. Once the network is restored, there may be too much traffic to be sent to followers, so flow control is required here. Its implementation is somewhat similar to TCP’s sliding window, which won’t be described here.

In summary, Progress is also a state machine. Here is its state transition diagram:

raft.go

With all the concepts laid out, it’s finally time to implement the logic. As the name suggests, the concrete implementation of the RAFT protocol is in this file. Much of the logic is driven by the Step function.

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raft.go#L752
func (r *raft) Step(m pb.Message) error {
    / /...
    switch m.Type {
        case pb.MsgHup:
        / /...
        case pb.MsgVote, pb.MsgPreVote:
        / /...
        default:
            r.step(r, m)
    }
}
Copy the code

The main purpose of Step is to process different [messages]({{< relref “#message” >}}), so this is a great place to look when we want to know what logic raft uses to process a particular message. At the end of the function, there is a default statement, meaning that all messages that cannot be processed above fall into this statement and are processed by a lowercase step function. What is the reason for this design?

Because actually the raft was also implemented as a state machine, it is the step attribute of a function pointer, according to the different roles of the current node, pointing to a different message processing functions: stepLeader/stepFollower/stepCandidate. Similarly, a tick function pointer switches back and forth between tickHeartbeat and tickElection, depending on the role, to trigger timed heartbeat and election detection, respectively. Function Pointers here feel like OOP polymorphism.

node.go

Node’s main role is to connect the application layer (ETCDServer) to the consensus module (RAFT). The message of the application layer is transmitted to the underlying consensus module, and the consensus result of the underlying consensus module is fed back to the application layer. So its initialization function creates a bunch of channels to communicate with, and then starts an event loop in another Goroutine, flipping data through various channels (it seems that this for-select-channel event loop is popular in Go).

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L286
for {
  select {
    case m := <-propc:
        r.Step(m)
    case m := <-n.recvc:
        r.Step(m)
    case cc := <-n.confc:
        // Add/remove/update node according to cc.Type
    case <-n.tickc:
        r.tick()
    case readyc <- rd:
        // Cleaning after result is consumed by application
    case <-advancec:
        // Stablize logs
    case c := <-n.status:
        // Update status
    case <-n.stop:
        close(n.done)
        return}}Copy the code

Propc and RECVC get messages that are passed in from the upper layer of the application and are then passed to the Step function in the Raft layer, which I described above.

Let’s explain what readyc does. In this implementation of ETCD, Node is not responsible for persisting data, communicating network messages, or applying committed logs to the state machine, so Node uses the readyc channel to notify that data is Ready to be processed and packages the data that needs to be processed externally into a Ready structure:

/ / https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L52
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
    // The current volatile state of a Node.
    // SoftState will be nil if there is no update.
    // It is not required to consume or store SoftState.
    *SoftState

    // The current state of a Node to be saved to stable storage BEFORE
    // Messages are sent.
    // HardState will be equal to empty state if there is no update.
    pb.HardState

    // ReadStates can be used for node to serve linearizable read requests locally
    // when its applied index is greater than the index in ReadState.
    // Note that the readState will be returned when raft receives msgReadIndex.
    // The returned is only valid for the request that requested to read.
    ReadStates []ReadState

    // Entries specifies entries to be saved to stable storage BEFORE
    // Messages are sent.
    Entries []pb.Entry

    // Snapshot specifies the snapshot to be saved to stable storage.
    Snapshot pb.Snapshot

    // CommittedEntries specifies entries to be committed to a
    // store/state-machine. These have previously been committed to stable
    // store.
    CommittedEntries []pb.Entry

    // Messages specifies outbound messages to be sent AFTER Entries are
    // committed to stable storage.
    // If it contains a MsgSnap message, the application MUST report back to raft
    // when the snapshot has been received or has failed by calling ReportSnapshot.
    Messages []pb.Message

    // MustSync indicates whether the HardState and Entries must be synchronously
    // written to disk or if an asynchronous write is permissible.
    MustSync bool
}
Copy the code

Once the application gets this Ready, it needs to:

  1. Persist HardState, Entries, Snapshot to storage.
  2. Broadcast Messages to other nodes.
  3. Apply the CommittedEntries (committed but not applied) to the state machine.
  4. Call if an entry of the member change type is found in CommittedEntriesnode.ApplyConfChange()Ways to makenodeKnow.
  5. Last callnode.Advance()Tell raft that this batch of status updates is done, the state has evolved and you can give me the next batch Ready for me to work with.

Life of a Request

Having walked through the structure of the package, let’s look at how raft handles a request in combination with specific code. I’ve always found that being able to trace the processing of a request at the code level is very helpful in understanding the system at both a macro and a micro level.

Life of a Vote Request

  1. First of all, within the node’s big loop there is a ticking tick channel that triggers the raft.tick() function. If the current node is a follower, its tick function points to tickElection. The processing logic of tickElection is to send itself an internal MsgHup message, and the Step function will call the campaign function upon seeing this message and enter the campaign state.

    // tickElection is run by followers and candidates after r.electionTimeout.
    func (r *raft) tickElection(a) {
        r.electionElapsed++
    
        if r.promotable() && r.pastElectionTimeout() {
            r.electionElapsed = 0
            r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
        }
    }
    
    func (r *raft) Step(m pb.Message) error {
        / /...
        switch m.Type {
        case pb.MsgHup:
            r.campaign(campaignElection)
        }
    }
    Copy the code
  2. The campaign will call becomeCandidate to switch itself into candidate mode and increment the Term value. Then, it sends its Term and log information to other nodes to request voting.

    func (r *raft) campaign(t CampaignType) {
        / /...
        r.becomeCandidate()
        // Get peer id from progress
        for id := range r.prs {
            / /...
            r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
        }
    }
    Copy the code
  3. On the other hand, after receiving this request, other nodes will first compare whether the received Term is larger than their own, and whether the received log information is newer than their own, so as to decide whether to vote. This logic can be found in the Step function:

    func (r *raft) Step(m pb.Message) error {
        / /...
        switch m.Type {
        case pb.MsgVote, pb.MsgPreVote:
            // We can vote if this is a repeat of a vote we've already cast...
            canVote := r.Vote == m.From ||
                / /... we haven't voted and we don't think there's a leader yet in this term...
                (r.Vote == None && r.lead == None) ||
                / /... or this is a PreVote for a future term...
                (m.Type == pb.MsgPreVote && m.Term > r.Term)
            / /... and we believe the candidate is up to date.
            if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
                r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            } else {
                r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})}}}Copy the code
  4. Finally, when the candidate node receives the vote reply, it will calculate whether the number of votes received is more than half of all nodes. If so, it will become the leader and inform the world; otherwise, it will be set as follower:

    func (r *raft) Step(m pb.Message) error {
        / /...
        switch m.Type {
        casemyVoteRespType: gr := r.poll(m.From, m.Type, ! m.Reject)switch r.quorum() {
            case gr:
                if r.state == StatePreCandidate {
                    r.campaign(campaignElection)
                } else {
                    r.becomeLeader()
                    r.bcastAppend()
                }
            case len(r.votes) - gr:
                r.becomeFollower(r.Term, None)
        }
    }
    Copy the code

Life of a Write Request

  1. A write request is usually initiated by calling Node. Propose, which encapsulates the write request into an MsgProp message and sends it to itself for processing.

  2. Instead of processing the message directly, the message handler Step calls the lower-case Step function to process the message based on the current state.

    • If it is currently a follower, it forwards this message to the leader.
    func stepFollower(r *raft, m pb.Message) error {
        switch m.Type {
        case pb.MsgProp:
            / /...
            m.To = r.lead
            r.send(m)
        }
    }
    Copy the code
  3. After the Leader receives the message (either forwarded by the followers or generated internally), it performs two steps:

    1. Add this message to your log
    2. Broadcast this message to other followers
    func stepLeader(r *raft, m pb.Message) error {
        switch m.Type {
        case pb.MsgProp:
            / /...
            if! r.appendEntry(m.Entries...) {return ErrProposalDropped
            }
            r.bcastAppend()
            return nil}}Copy the code
  4. After the follower receives this log, an MsgAppResp message is returned.

  5. After the leader confirms that enough followers have accepted the log, it first commits the log and then broadcasts its commit status again. The implementation here is a bit like a two-phase commit.

    func stepLeader(r *raft, m pb.Message) error {
        switch m.Type {
        case pb.MsgAppResp:
            / /...
            if r.maybeCommit() {
                r.bcastAppend()
            }
        }
    }
    
    // maybeCommit attempts to advance the commit index. Returns true if
    // the commit index changed (in which case the caller should call
    // r.bcastAppend).
    func (r *raft) maybeCommit(a) bool {
        / /...
        mis := r.matchBuf[:len(r.prs)]
        idx := 0
        for _, p := range r.prs {
            mis[idx] = p.Match
            idx++
        }
        sort.Sort(mis)
        mci := mis[len(mis)-r.quorum()]
        return r.raftLog.maybeCommit(mci, r.Term)
    }
    Copy the code

Conclusion

The RAFT module in Etcd only implements the RAFT consensus algorithm, while the network transfer of messages and data storage are handled by the upper layer applications. This article first introduces basic data structures and then introduces raft algorithm on top of those data structures. At the same time, a vote request and a write request are taken as examples to introduce the complete processing process of a request from receiving to replying.

So far, there are many details we haven’t covered, such as Linearizable Read, snapshot mechanism, and WAL storage and playback, so I hope you can use this article as a basis for further research.


  1. As of this writing, the latest version of ETCD is V3.3.10, so the analysis here is based on V3.3.10. ↩ ︎

  2. Raft’s PreVote implementation mechanism ↩︎

  3. Etcd Raft library parsing ↩︎ ↩︎

  4. Design spec for Raft Progress ↩︎