Raft is a consistent replication algorithm in distributed systems, which ensures strong consistency. It is developed based on PaxOS for better production implementation and easy to understand. Lab2A is primarily about code implementation of raft’s election process, followed by points of understanding.

Let’s talk briefly about what I understand about the election part of Raft algorithm. Each server in a cluster has three states: Follower,Candidate, and Leader. Each machine is set to a random time (in order to avoid multiple machines voting at the same time). When the time is up, the Follower becomes a Candidate and starts voting. The Candidate votes for himself and sends voting requests to other peers. If the Term returned is larger than the Candidate, the Candidate becomes a Follower. If the Term returned is smaller than the Candidate and the Candidate agrees, the Candidate becomes the Leader and sends a heartbeat (LogEntries is empty). If the Term received is larger than the Candidate, the Candidate sends a Follower. For the recipient, after receiving the voting request and heartbeat, they can decide whether to vote for it according to their term and state (see the follow-up for details).

The first is to build a large framework, set up chan to start election and send heartbeat, the background coroutine run to start election timer and send heartbeat timer, when the time is up to trigger to start election and send heartbeat. The timer can be a for loop plus time.Sleep or ticker. Also, in order to restart the timer at the appropriate time (described below), you need a resetTimer method to control when to reset the timer. Specifically, if the current state is Candidate and the current time minus lastTimeout is judged, if it is greater than the set timeout time (random number), the election starts. If the current status is Follower and times out, the user becomes Candidate and the election starts. If the current status is Leader and the time to send heartbeat is up, the heartbeat is sent.

func (rf *Raft) mainLoop() { for{ select { case <- rf.timerElectionChan: go rf.startElection() case <- rf.timerHeartbeatChan: go rf.broadcastHeartbeat() } } } func (rf *Raft) timerElection() { for { rf.mu.Lock() if rf.state ! = StateLeader { timeElapsed := (time.Now().UnixNano() - rf.lastResetElectionTimer) / 1e6 if timeElapsed > rf.timeoutElection { rf.timerElectionChan <- true } } rf.mu.Unlock() time.Sleep(time.Millisecond * 10) } } func (rf *Raft) timerHeartbeat() { for { rf.mu.Lock() if rf.state == StateLeader { timeElapsed := (time.Now().UnixNano() - rf.lastResetHeartbeatTimer) / 1e6 if timeElapsed > rf.timeoutHeartbeat { rf.timerHeartbeatChan <- true } } rf.mu.Unlock() time.Sleep(time.Millisecond * 10) } } func (rf *Raft) resetTimerElection() { rand.Seed(time.Now().UnixNano()) rf.timeoutElection = rf.timeoutHeartbeat * 4 + rand.Int63n(150) rf.lastResetElectionTimer = time.Now().UnixNano() } func (rf *Raft) resetTimerHeartbeat() { rand.Seed(time.Now().UnixNano()) rf.timeoutHeartbeat = 100 rf.lastResetHeartbeatTimer = time.Now().UnixNano() }Copy the code

Then there’s the election section. Change into a Candidate, vote for themselves, traverse and send voting request to all peers, if the vote is approved, the number of votes increases, judge when the majority vote is reached, send heartbeat to the leader.

Func (rf *Raft) startElection() {rf.mu.lock () rf.convertto (StateCandidate) rf.mu.unlock () // Send a request to vote for yourself votedNum := 1 Var wg sync.waitgroup wg.add (len(rf.peers)) var lastLogIndex int lastLogTerm int if len(rf.log) == 0{ lastLogIndex = -1 lastLogTerm = 0 }else{ lastLogIndex = len(rf.log) - 1 lastLogTerm = rf.log[len(rf.log)-1].Term } reqVoteArgs := &RequestVoteArgs{ Term: rf.currentTerm, CandidateID: rf.me, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm, } replyCh := make(chan *RequestVoteReply, len(rf.peers)) for idx, _ := range rf.peers { curIdx := idx go func(idx int) { rf.mu.Lock() if idx == rf.me || ! (rf.state == StateCandidate){ wg.Done() rf.mu.Unlock() return }else{ rf.mu.Unlock() } rspVoteArgs := &RequestVoteReply{} Rf. SendRequestVote (independence idx reqVoteArgs, rspVoteArgs) replyCh < - rspVoteArgs wg. Done ()} (curIdx)} / / if the majority agree that, Become leader go func() {wg.wait () close(replyCh)}() for reply := range replyCh{rf.mu.lock () if reply.term > Rf.currentterm {rf.currentterm = reply.term // convertTo follower rf.convertto (StateFollower)} if reply.votegranted {rf.currentterm = reply.term // convertTo follower rf.convertto (StateFollower)} if reply.votegranted { VotedNum ++} rf.mu.unlock () if votedNum > len(rf.peers) / 2{votedNum++} rf.mu.unlock () if votedNum > len(rf.peers) / 2{ Break}} if votedNum >= len(rf.peers) / 2 + 1{rf.mu.lock () rf.convertto (StateLeader) rf.mu.Unlock() rf.broadcastHeartbeat() } }Copy the code

The execution logic of the server that receives the heartbeat is as follows: If the received Term is larger than itself, it becomes a follower. If no one has voted for it during this Term or the person who voted is the server that initiated the request, the server will vote for it and reset the election timer. If the term received is smaller than itself, vote against it.

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). rf.mu.Lock() defer rf.mu.Unlock() if args.Term >= rf.currentTerm{ rf.convertTo(StateFollower) rf.currentTerm = args.Term } if args.Term < rf.currentTerm || (rf.votedFor ! = -1 && rf.votedFor ! = args.CandidateID){ reply.Term = rf.currentTerm reply.VoteGranted = false return } if rf.votedFor == -1 || rf.votedFor == args.CandidateID{ reply.Term = rf.currentTerm reply.VoteGranted = true rf.votedFor = args.CandidateID rf.resetTimerElection() } }Copy the code

Reset the heartbeat timer and send the heartbeat to all peers. If the heartbeat fails, it is converted to Follower.

func (rf *Raft) broadcastHeartbeat() { rf.mu.Lock() if rf.state ! = StateLeader{ rf.mu.Unlock() return } rf.resetTimerHeartbeat() rf.mu.Unlock() for i := 0; i < len(rf.peers); i++{ if i == rf.me{ continue } go func(id int) { rf.mu.Lock() args := AppendEntriesArgs{ Term:rf.currentTerm, } rf.mu.Unlock() var reply AppendEntriesReply if rf.sendAppendEntries(id,&args,&reply){ rf.mu.Lock() defer rf.mu.Unlock() if rf.state ! = StateLeader{ return } if rf.currentTerm ! = args.Term{ return } if reply.Success{ continue }else{ if reply.Term > rf.currentTerm{ rf.convertTo(StateFollower) rf.currentTerm = reply.Term return } } } }(i) } }Copy the code

Receives a heartbeat

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.currentTerm{ reply.Term = rf.currentTerm reply.Success = false return } rf.resetTimerElection() if args.Term > rf.currentTerm || rf.state ! = StateFollower{ rf.convertTo(StateFollower) rf.currentTerm = args.Term } reply.Term = rf.currentTerm reply.Success = true }Copy the code

Conclusion:

1. Therefore, the scenario of resetting the election timer: at the beginning of Make, when voting for a vote, when becoming a Candidate, when receiving a heartbeat, the scenario of resetting the heartbeat timer: at the beginning of Make, when the leader broadcasts and sends heartbeat

2, the use of locks need to be careful, when sending requests do not lock, otherwise it is easy to cause deadlock

3. If the follower does not receive any heartbeat during a period of time, the follower becomes the candidate to initiate an election. This is achieved by resetting the election timer instead of setting a flag to check whether the heartbeat is received

Reference: www.ulunwen.com/archives/22… Nil.csail.mit.edu/6.824/2020/… zhuanlan.zhihu.com/p/152236946