First read the raft papers, address is as follows: https://github.com/maemual/ra…

The point of Raft is that a majority vote requires an odd number of servers, and when there are network partitions, there is no network symmetry. Then, in order for anything to happen, more than half of the servers have to approve the action, so the other partition can’t do anything. Raft uses term numbers to identify leaders. At most one leader per term, and followers only need to know the current term.

Pay attention to the volatile state of the RAFT and make sure that the values of these fields depend on the persistence state of the RAFT.

lab 2a

Conceive the structure of RAFT node according to the content of the paper:

type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() currentTerm int votedFor int heartbeatTimer *time.Timer electionTimer * Time.Timer state NodeState log logEntry takes two timers: heartbeat and election. State represents its role: leader, follower, candidate. You also need to save the log entry. Note that the log entry here is mentioned in Figure 6 of the paper. Instead of an index, we use an array index with the following structure: Type LogEntry struct {term int command interface{}} is a function that can be used to transform the role. Before this, two utility functions, RandTimeDuration and ResetTimer, are completed. A random number within the production range that is used to time the election. To avoid splitting the vote, RAFT achieves this by randomly selecting a timeout for the election timer. Election timeout lower than the choice of the heart, otherwise will trigger the election before received normal heartbeat, considering the network packet loss, the election of timing should lower bound for the heartbeat multiples of timing, the election of timing random time should also is likely to be greater than the RPC round-trip time (probably 10 ms), makes the first one to start node can complete round, Therefore, the upper bound should be large enough, but the problem of too large is that the election time may be very long, so too long may lead to the election timeout. We should set the time reasonably, and the upper bound may need to be reduced according to the test performance to ensure that the leader can recover quickly. The following is the code for setting the upper and lower bounds of heartbeat timing and election timing:  const ( HeartbeatInterval = time.Duration(120) * time.Millisecond ElectionTimeoutLower = time.Duration(300) * time.Millisecond ElectionTimeoutUpper = time.Duration(400) * time.Millisecond )

The following is the function that transitions roles, which requires a lock to be called because of the node state involved.

func convertTo(rf *Raft, s NodeState) {

    if s == rf.state {
        return
    }
    DPrintf("Term %d:server %d convert from %v to %v\n",
        rf.currentTerm, rf.me, rf.state, s)
    pres := rf.state
    rf.state = s
    switch s {
    case Follower:
        if pres == Leader {
            rf.heartbeatTimer.Stop()
        }
        resetTimer(rf.electionTimer, randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
        rf.votedFor = -1
    case Leader:
        rf.electionTimer.Stop()
        rf.broadcastHeartbeat()
        resetTimer(rf.heartbeatTimer, HeartbeatInterval)
    case Candidate:
        rf.startElection()
    }
}

The next step is to implement the main logic. For any state change in RAFT, a lock is required to avoid the Race Condition. The following is the implementation of the startSection:

var count int64 rf.currentTerm += 1 args := &RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, } for i := range rf.peers { if i == rf.me { rf.votedFor = rf.me atomic.AddInt64(&count, 1) continue } go func(server int) { reply := &RequestVoteReply{} if rf.sendRequestVote(server, args, reply) { rf.mu.Lock() //must lock after rpc DPrintf("%+v state %v got RequestVote response from node %d, VoteGranted=%v, Term=%d", rf, rf.state, server, reply.VoteGranted, reply.Term) if reply.VoteGranted && rf.state == Candidate { atomic.AddInt64(&count, 1) if atomic.LoadInt64(&count) > int64(len(rf.peers)/2) { rf.convertTo(Leader) } } else { if rf.currentTerm < reply.Term  { rf.currentTerm = reply.Term rf.convertTo(Follower) } } rf.mu.Unlock() } }(i) }

In the case of a lock, try to use the same layer of unlock, do not lock and unlock repeatedly. I used to like to defer the unlock, but it is easy to make mistakes in the complicated locking logic. When you write code, try to be as careful as possible about the timing of the locks and unlocks.

The corresponding RPC function is:

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {

   rf.mu.Lock()
   defer rf.mu.Unlock()
   DPrintf("RequestVote raft %+v args %+v", rf, args)
   if rf.currentTerm > args.Term {
      reply.Term = rf.currentTerm
      reply.VoteGranted = false
      return
   }

   rf.convertTo(Follower)

   if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
      rf.votedFor = args.CandidateId
      reply.VoteGranted = true
      reply.Term = rf.currentTerm // not used, for better logging
   }
   // reset timer after grant vote**
   resetTimer(rf.electionTimer,
      randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))

   // Your code here (2A, 2B).

}

Reset election timing after voting.

Here’s Broadcase Theadbeat’s code:

for i := range rf.peers { if i == rf.me { continue } go func(server int) { rf.mu.Lock() if rf.state ! = Leader { rf.mu.Unlock() return } args := &AppendEntriesArgs{ Term: rf.currentTerm, LeaderId: rf.me, } rf.mu.Unlock() reply := &AppendEntriesReply{} if rf.sendAppendEntries(server, args, reply) { rf.mu.Lock() if rf.state ! = Leader { rf.mu.Unlock() return } DPrintf("%+v state %v got appendEntries response from node %d, success=%v, Term=%d", rf, rf.state, server, reply.Success, reply.Term) if reply.Success { } else { if reply.Term > rf.currentTerm { rf.currentTerm = reply.Term rf.convertTo(Follower) } } rf.mu.Unlock() } }(i) }

The corresponding handle is:

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
   // Your code here (2A, 2B).
   if args.Term < rf.currentTerm {
      reply.Success = false
      reply.Term = rf.currentTerm
      return
   }
   if args.Term > rf.currentTerm {
      rf.currentTerm = args.Term //update its own term
      rf.convertTo(Follower)
      // do not return here.
   }
   resetTimer(rf.electionTimer,
      randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))

   reply.Success = true
   reply.Term = rf.currentTerm //for debug

}

The election timer needs to be reset after receiving the heartbeat.

Finally, the make function for raft:

func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization  code here (2A, 2B, 2C). rf.currentTerm = 0 rf.votedFor = -1 rf.heartbeatTimer = time.NewTimer(HeartbeatInterval) rf.electionTimer = time.NewTimer(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper)) rf.state = Follower go func(node *Raft) { for { select { case <-node.electionTimer.C: node.mu.Lock() node.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper)) //need to be reset because election may fail if node.state == Follower { node.convertTo(Candidate) } else { node.startElection() //Avoid electoral defeat } node.mu.Unlock() case <-node.heartbeatTimer.C: node.mu.Lock() if node.state == Leader { node.broadcastHeartbeat() node.heartbeatTimer.Reset(HeartbeatInterval) } node.mu.Unlock() } } }(rf) // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) return rf }

Note that it is necessary to initialize the vote to -1, and the node needs to be reset after the election timer expires. At this time, a leader may have been elected, but the heartbeat may not have arrived yet, or the election may fail. In consideration of the failure, a new round of election needs to be started, and the node will change to follower when the heartbeat packet is received.

lab 2b

Next is the experiment content of 2b, which needs to synchronize log information to other nodes. Add the following fields under the RAFT structure:

applyCh     chan ApplyMsg
nextIndex   []int
matchIndex  []int
commitIndex int
lastApplied int

Notice that nextIndex is an optimistic estimate, and matchIndex is a conservative estimate.

Matters needing attention

How to select the prevlogIndex: The prevlogIndex indicates the position at which the Leader believes the followers have been synchronized. Therefore, it is OK to set the first digit of the nextIndex[Follower], where aggressive estimates are used.

Note that Applych is not missing.

When requestvotes from other candidates are received and the Term of this candidate is relatively high, it is necessary to successfully vote to update the Term. Without faithful implementation of this clause, it is difficult for nodes to turn into followers and reset their VotedFor. Leading to voting for themselves and calling elections. It is obviously not possible to elect a new Leader quickly.

When we encoding the LogEntries sent by the heartbeat packet, since we do not lock the RPC and LogEntries are a reference type, these entries may be modified in another place. It is possible for the Leader’s logs to be read and written at the same time. To avoid this, we need to copy the Entries in encoding.

prevLogIndex := rf.nextIndex[server] - 1

clogs := make([]LogEntry, len(rf.logs[prevLogIndex+1:]))

copy(clogs, rf.logs[prevLogIndex+1:])
args := &AppendEntriesArgs{
   Term:         rf.currentTerm,
   LeaderId:     rf.me,
   PrevLogIndex: prevLogIndex,
   PrevLogTerm:  rf.logs[prevLogIndex].Term,
   LogEntries:   clogs,
   LeaderCommit: rf.commitIndex,
}

The log synchronization method is asymptotic. The paper states that the Conflictterm returned by the follower to the leader may not speed up the performance, so here is a line of code to solve the conflict

rf.nextIndex[server] -= 1

ApplyMSg (KV Server) ApplyMSg (KV Server) ApplyMSg (KV Server) ApplyMSg (KV Server) ApplyMSg (KV Server) ApplyMSg (KV Server) ApplyMSg (KV Server) ApplyMSg (KV Server)

Lab 2 c:

This is the easy part. When implementing persistence, you simply call persist when the relevant state of the RF changes.

Lab 3 a:

This part is to implement a distributed KV storage using RAFT.

What does lab3 do

With lab2 we have set up a RAFT that maintains a consistent log. Lab3 is designed to implement a simple fault-tolerant key-value storage service based on RAFT. It uses the following interfaces that RAFT exposes:

  • Start() is used to Start synchronizing an operation in RAFT
  • Use Applych to get synchronized results

For the already synchronized operations from Applych, we were ready to execute. These operations are defined by services that are built on top of Raft. For example, in lab3 we define three operations: PUT, GET, and APPEND.

The relationship between client, server, and raft

Here’s a nice picture to look at:

Basic architecture:

  • Each Server is also a RAFT node
  • Multiple clients send requests to the server where the Leader is at the time

The basic process is as follows:

  1. The client sends PUT, GET, and APPEND requests to the server
  • Each client sends only one request at a time, without any synchronization
  1. When the server receives it, it calls Start() to Start synchronizing all RAFT nodes
  • The server needs to be able to process concurrent requests made by different clients in parallel
  1. When the synchronization ends, the RAFT node notifies the host server via Applych
  2. The server realizes that the synchronization has been successful and begins to execute the operation requested by the client

How do I design Server Threads

First, on each request, Handler blocks waiting for the run to finish. This process is parallel, meaning that multiple requests can be processed simultaneously. When the request is processed, send the operation to RAFT synchronization via Start() and wait.

In addition, a goroutine is started at initialization time to continuously read messages in the applyCh and perform operations in those messages. The corresponding waiting handler is notified when the execution completes.

It is worth noting that for non-leader servers, they also need to perform these operations, but they do not need to notify Handler that the operation has ended.

go func() { for msg := range kv.applyCh { if msg.CommandValid == false { continue } op := msg.Command.(Op) DPrintf("kvserver %d applied command %s at index %d", kv.me, op.Name, msg.CommandIndex) kv.mu.Lock() lastAppliedRequestId, ok := kv.lastAppliedRequestId[op.ClientId] if ok == false || lastAppliedRequestId < op.RequestId { switch op.Name { case  "Put": kv.db[op.Key] = op.Value case "Append": kv.db[op.Key] += op.Value // Get() does not need to modify db, skip } kv.lastAppliedRequestId[op.ClientId] = op.RequestId } ch, ok := kv.dispatcher[msg.CommandIndex] kv.mu.Unlock() if ok { notify := Notification{ ClientId: op.ClientId, RequestId: op.RequestId, } ch <- notify } } }()

In order to support simultaneous response to multiple requests, we introduce a Dispatcher that distributes end of execution notifications to the corresponding waiting clients. Its key is the index of the log in raft, and its value is the operation. We create a new channel every time we start waiting, and when the synchronization is complete, we get the reply from the channel and delete it.

func (kv *KVServer) waitApplying(op Op, timeout time.Duration) bool { // return common part of GetReply and PutAppendReply // i.e., WrongLeader index, _, isLeader := kv.rf.Start(op) if isLeader == false { return true } var wrongLeader bool defer DPrintf("kvserver %d got %s() RPC, insert op %+v at %d, reply WrongLeader = %v", kv.me, op.Name, op, index, wrongLeader) kv.mu.Lock() if _, ok := kv.dispatcher[index]; ! ok { kv.dispatcher[index] = make(chan Notification, 1) } ch := kv.dispatcher[index] kv.mu.Unlock() select { case notify := <-ch: kv.mu.Lock() delete(kv.dispatcher, index) kv.mu.Unlock() if notify.ClientId ! = op.ClientId || notify.RequestId ! = op.RequestId { // leader has changed wrongLeader = true } else { wrongLeader = false } case <-time.After(timeout): wrongLeader = true } return wrongLeader }

How do I avoid performing repeated operations

Since we switch the Leader and the client retries to send the request, we need to ensure that the retried request is not executed as a new request. Therefore, we maintain a set of sequence IDs for each client to ensure that the request is unique. We do not add the sequence ID to the retry request. Instead, Start() is added to the Raft, and the client ID and sequence ID can be used to exclude the duplicate operation when the server finally processes it.

How do I validate request returns

As stated in the request, we need to deal with the situation where the written log is overwritten as a result of the Leader change after the request is made. This time the request returns the result of another operation.

We maintain a sequence ID for each operation for each client. For the server, a unique client number and a unique sequence ID correspond to a unique operation.

Therefore, we only need to check the Client ID and Sequence ID to verify that the return is the result of the current request.

Reference: double – free/MIT6.824-2018 – Chinese: A Chinese version of MIT (distributed system), 6.824 (https://github.com/double-fre…