The original address: www.inlighting.org/archives/mi…

It took me 25 days to complete 4 LABS of 2021 MIT 6.824. The pits and design ideas I encountered are recorded here for reference of the followings.

Lab 2 > Lab 4 >> Lab 3 = Lab 1.

Here I simply record my own design ideas and encountered pits.

If you want a more concrete code implementation, check out github.com/LebronAl/MI… .

Lab 1 MapReduce

Lab 1 requires us to implement a MapReduce, test the machine, and be familiar with the grammar of GO. Shuffle temporary files in MapReduce can be stored on local disks rather than in a distributed file system.

In this case, I designed the worker to send the heartbeat repeatedly to the coordinator to request the task after it starts. Coordinators internally maintain the MapPhase (ReducePhase) of the current job and the Task pool of the current phase. When a worker asks for a task, a task is removed from the task pool and given to the worker. At the same time, a Coordinator starts an asynchronous thread to check whether the task is completed. If the task is not completed for more than 10 seconds, put the task back into the task pool. In this way, the straggler worker and worker crashed problems can be solved properly.

When the worker obtains a task through the heartbeat, it starts to execute it. After the execution is complete, the worker sends an RPC to the coordinator to notify the coordinator that the task is complete and continues to request the task. If there is no task available (a Coordinator task pool is empty), sleep for a while and try again to request the task.

If the coordinator has completed all the tasks, the heartbeat returned to the worker indicates that it has completed the task and lets the worker exit.

For the format of the generated Shuffle temporary files, refer to the official Hints.

Lab 2 Raft

To implement raft in Lab 2, many people are persuaded to quit after the leader election, which is actually not as difficult as imagined.

Before starting raft, do your homework on Go Lock and refer to the official guidance and Figure 2 in the paper when writing code. In particular, nothing in Figure 2 is nonsense. If you don’t think about one thing in Figure 2, you will definitely fail to pass tests.

Attached are two raft visualization sites for your reference. In fact, we can only look at the leader election. The following web page is not very detailed

thesecretlivesofdata.com/raft/

raft.github.io/

A background thread

In my Raft implementation, two threads are maintained in the background, ticket() and applier(), and ticket() controls timeout actions such as not receiving a heartbeat at a given time, becoming a candidate, etc. Applier () is responsible for listening on the current commitIndex and lastApplied. Applier () blocks with condition variables and wakes up applier() every time a commitIndex is added, allowing applier() to push lastApplied and submit applyMsg.

I use the go timer in ticket(), or you can use the ticket that comes with Go.

Timing

Timing is very important, there need to be two Timing in raft, one is the interval of sending heartbeat, which is a stable time, here I set 100 ms. The other option is election timeout, which is a random value. My implementation fluctuates between 500 and 900 ms (actually a bit too much). Be careful to reset the Random seed when generating the random time, because I didn’t reset the seed in the first place, and later in the test, it took too long for the candidate to split vote.

At first, I had a misunderstanding that the timeouts in the follower and candidate are two timings. In fact, both of them share election timeouts.

tip

Deadlocks: Careless code can easily lead to deadlocks, and I’ve used a few tricks to avoid deadlocks as much as possible. The following code calls lock in lock. This is very common, especially if your code is not written all at once, and if you get to the end, you may forget that incTerm() is the lock that will be acquired, which will result in repeated locking. So I’ll call this method xxxxWithoutLock to remind myself that the method itself is not locked, but needs to be locked in order to execute properly.

func (rf *Raft) startElection(a) {
  // xxxxx
  rf.mu.Lock()
  // xxxxx
  rf.incTerm(term)
  // xxxxx
  rf.mu.Unlock()
}

func (rf *Raft) incTerm(targetTerm int) {
  rf.mu.Lock()
  defer rf.mu.Lock()
	rf.currentTerm = targetTerm
	rf.votedFor = NONE
}
Copy the code

The modified code looks like this

func (rf *Raft) startElection(a) {
  // xxxxx
  rf.mu.Lock()
  // xxxxx
  rf.incTermWithoutLock(term)
  // xxxxx
  rf.mu.Unlock()
}

func (rf *Raft) incTermWithoutLock(targetTerm int) {
	rf.currentTerm = targetTerm
	rf.votedFor = NONE
}
Copy the code

Binding operation: Take the above code as an example. According to the paper, we know that every time we add term, we need to reset our votedFor to null, and the two operations are bound together. When you implement raft, there are a lot of places where you need to add terms, and then there are situations where you add terms and forget to reset votedFor, which is bad because it’s often hard to find and happens with random frequency. So you can tie these two steps together in a way.

Bundling can be used in many ways, such as resetting your nextIndex, matchIndex, and so on after being elected leader.

Don’t use len(rf.log)

Len (rf.log) to track the latest log index works in 2A, 2B, and 2C, but when you implement snapshot, you’ll call dad. You’ll need to start all over again and bugs will fly. Instead, start by defining a variable logIndex int to keep track of the current log index. Rf. log[0] stores lastLogIndex, lastLogTerm and other information (see snapshot for this), which is empty before 2D.

Check the outdated reply

Every time you write code that handles RPC returns, think twice about what happens if the return value is out of date. For example, you are already the leader, and then you receive the reply from Shan who belatedly agrees that you become the leader. At this point, you should directly ignore and do nothing.

Killed ()

Maybe we’ll use it

for {
  // xxxxxx
}
Copy the code

Make sure to check if the for exits if RAFT Killed. In the beginning, I forgot to finish at one point, which resulted in high CPU usage during the test, which eventually resulted in a timeout.

Paper Figure 8 explanation

Figure 8 in the paper has puzzled many people. Here is the core idea of Figure 8: When a leader commit log is proved through time sequence, You can only commit logs of your current term (not logs of previous terms), otherwise log conflicts will occur.

The top abscissa is log index, the number in the square is the term of the log, and the ordinate is 5 nodes.

  • (a) : S1 is the leader, and the number with index=2 is copied to S2.
  • (b) : S1 crashes, S5 goes online, because the term increases, so the term in S5 with index=2 is 3.
  • (c) : S5 crashes, S1 comes back, and copies its index=2 data to more than half of the nodes. In this case, there are two cases. First, we do not obey the rule and submit the data with index=2. Second, we comply with the rules, because the leader can only submit the data of the current term, so the leader will not submit even if the data with index=2 has been copied to more than half of the nodes.
  • (d) : This figure assumes that we do not obey the rule in (c), and S5 reconnects. S5 can win the election of S2, S3, and S4, because for them, S5’s log is the latest (term maximum), and then S5 copies its index=2 data to other nodes. In S1, S2, and S3, data with index=2 committed is overwritten by data from S5 (term=3), violating the rule that once data is committed, it cannot be modified.
  • (e) : This graph assumes that we obeyed the rule in (c) because the index =2 data was not committed, even if it was overwritten as in (d), after all, there was no commit. At this point, S1 synchronizes data with index=3 to half of the nodes. Because the term of S1 is 4, and index=3, the data of term=4 has been synchronized to more than half of the nodes, you can commit, and then according toLog Matching PropertyAs a rule, the previous index=2 will also be submitted. S5 won’t be able to modify the submitted data because it won’t win the election.

2A Leader Election

To start election sending RPC, use go Fun (){} to send RequestVote() asynchronously to each node, be sure not to send it serially!

Once you have more than half of the votes, become the leader and broadcast the heartbeat immediately (don’t wait for the heartbeat timer to expire), so that candidates with the same term become followers immediately (otherwise they run out of time to re-elect, Term is older than you, and you lose your newly acquired position as leader.

Become leader remember to reset matchIndex and nextIndex.

2B Log

You need to implement optimized AppendEntries() to quickly locate conflict index (the guidance in the paper is rough, you can refer to the official guidance) Otherwise one of the tests will fail.

In this case, the leader maintains a Replicator for each peer. In other words, whenever there is synchronization between the leader and peer, only one replicator sends AppendEntries() (no concurrency). This greatly reduces the complexity of the code logic, so that each Replicator is a single thread, and you don’t need to worry about concurrency. The Replicator can be controlled by conditional variables. When Start() triggers or broadcasts a heartbeat, it wakes up the conditional variables and synchronizes the Replicator asynchronously.

When synchronizing, you cannot send one by one because it is too slow. For example, if you find the conflict index and conflict term, you can directly put them after the conflict index in one breath. All logs belonging to the same conflict term are packaged and sent at one time. Of course, you can just post all the conflict index logs.

Note that you can only submit the log belonging to your own term when pushing commitIndex. This is clearly stated in the paper. I didn’t take care of that at first, 2B didn’t work out, and then 2C had all sorts of problems.

When you find that 2B can pass steadily, don’t be complacent that it’s done right, when 2C is all exposed.

2C Persistence

The persistence part is easy, but the hard part is that you have to solve the bugs that you buried in 2B.

For persistence, as long as you keep in mind that votedFor, log, logIndex, term changes need to be saved immediately. You need to save the snapshot every time you modify it.

Note the TestFigure8Unreliable2C() test point, where I was stuck for a week.

2D Snapshot

I did not understand the part of Snapshot at the beginning. In the paper, it was said that raft carried out Snapshot by herself when it found that its log was too large, but the Snapshot in the lab was called by the test case, and after a long circle, the logic was clear.

The upper-level service finds that the log is too large (in Lab2, the test case acts as the Service) and calls the snapshot method to save the snapshot.

Rf. log[0] can be used to save lastLogIndex and lastLogTerm information of the latest snapshot.

Suppose there is a node that crashes all the time and then comes back to life. The leader finds that it is too far behind and sends the InstallSnapshot() RPC to the behind node. After the backward node receives the snapshot in InstallSnapshot(), it sends the snapshot to the upper-layer service through rf.applyCh. When the upper-layer service receives the Snapshot, it calls the CondInstallSnapshot() method of the node. If there is a commit after the snapshot, the node will refuse to install the Snapshot and the service will abort the installation. Otherwise, if there is no commit after the snapshot, the node will install the snapshot and return true, and the service will install the snapshot at the same time, with a 2PC internal taste.

Don’t forget persistence when installing Snapshot.

Lab 3 KV Raft

This section requires us to implement our own service based on RAFT. In this section, the service is a K/V database, and then the Client (CLERK) interacts with the service to CURD.

Before we begin, we can refer to Section 6 of Raft’s PhD thesis, which talks about the flow of client and service interaction and the design of RPC.

Part A

In the code skeleton provided by the official, we can see that the official expects us to divide the Get and PutAppend into two RPC. I think it is not necessary here, I merge them together, so that the service only needs one method to process all the requests of the Clerk.

The Service stores the database directly with a map instead, which is certainly not so simple in real life.

When the Service is started, a thread is started to listen to the applyMsg submitted by raft in real time. All database operations are committed via raft.start () and then wait for the underlying RAFT Apply.

De-duplication: De-duplication of client requests is very important, as described in 6.824, where I use the clientId and sequenceNum binary groups to determine the unique request. ClientId is generated randomly when CLERK is created. SequenceNum’s initial value is 1. SequenceNum only + 1 when a command is successfully executed. The Service also uses a lastClientOperations Map [INT64]ClientOperation structure to store the latest requests from the client (note that each node needs to store, not just the leader). Otherwise, the leader hangs and the table is lost. So you need raft to send the applyMsg and successfully execute kv operation to add records to the de-replay table. With the increase of clients, lastClientOperations will definitely become very large. I did not manage it here, nor did I handle it, nor did I ask us to handle it in the experiment. Specific treatment methods can be seen in the doctoral thesis.

Note: A request is de-checked twice, once just after the Service receives the RPC from THE CLERK and once before the Service receives the applyMsg from the RAFT layer and is ready to start writing to the database.

Part B

One thing we should note about log compression is that we should not forget to compress lastClientOperations for de-weighting.

Lab 4 Sharded KV

Lab 4 requires us to implement a Sharded K/V database, which is equivalent to multi-raft, that is, running multiple Raft in a cluster. The performance of a traditional raft cluster is inevitably low because there is only one leader to interact with clients. We can fragment the data (the easiest mode to take). For example, the data 1 after the key is taken is put in a Replica group. Put 2 in a different group. This is a great way to spread requests across groups and allow multiple RAFT leaders to serve requests to solve raft performance issues.

4A Shard Controller

Implementing the Shard Controller is relatively easy, you can copy it directly from Lab 3 and you don’t even need to implement the snapshot because the number of config changes is definitely small, raft log doesn’t change very much and there is no need to do the snapshot.

In the Shard Controller, we mainly want to solve the problem that Shard is evenly distributed to each group. Here, I use a simple algorithm: use a for loop, corresponding to the join operation: Each shard is moved from the group with the most shards to the group with the fewest shards. For the leave operation, select an unmanaged SHard each time and assign it to the group with the least number of managed shards. When abs(maxShard−minShard)<= 1ABS (maxshard-minshard)<= 1ABS (maxShard−minShard)<=1, the loop ends.

4B Shard K/V Server

This is really a little bit complicated, so LET me give you a little bit of an idea.

Clerk section is not what to say, mainly is to re-distribute and distribute to different groups, the basic official code template has been clear.

It is mainly the service side, with more details:

Individual shards: Here, each shard of mine is independent, and each shard contains a map of the data and the user’s de-replicate table, so each shard is transferred with the de-replicate table. Shard provides three states: Serving, pulling, and offering represent normal receipt of CLERK requests, pulling represent the shard needs to be fetched from other groups, and offering represent the shard is fetched by remote groups. Note that shard only provides requests normally during Serving.

Monitor config: After the Service starts, the leader starts a separate thread that continuously reads the next config from the current config from the Controller. Note that you are not getting the latest config. For example, if your current config is 2, you get 3, even if the latest config is 10. This is because we need to change config step by step and cannot directly jump from 2 to 10, which is also mentioned in the official Hints. When a new config is retrieved, the shards you manage are marked (which shards are serving properly, which shards are pulling, and so on).

Note that the next config can only be fetched if the locally owned shard is in serving state. This is easy to understand, how can you start to change to the next config when the shard of this config has not been synchronized in place.

Shard migration: Shard migration I also use an independent thread on the leader. The leader scans the Shard of the current group at intervals. If the Shard state is not serving, The migration begins. (Shard state changes are handled by the thread monitoring config.)

When migrating we need to know where the shard is coming from and where it is going, so we actually save two Config’s, the current one and the last one, so we can learn the ins and outs of the shard.

Migrate my push, i.e. if I discover that my Shard 2 is offering and is being installed to another group, then the current group sends an InstallShard() RPC to another group, If the other group receives and saves successfully (reply=OK), THEN I remove the local shard, otherwise I fail and wait for the next round to try InstallShard() again until it succeeds. Writing this way greatly reduces the workload compared to the pull method, and also ensures that useless shards can be collected in a timely manner.

Shard Insert, Delete: When we need to install a new Shard or Delete a Shard, use raft.start () instead of deleting it locally.

Note that only the leader has the thread that monitors config and shard migrations, so make sure you are the Leader each time the loop executes. Similarly, when a node becomes the leader, you start both threads.

Live lock, commit empty logs: Consider a situation where a node has just become the leader, but the client does not commit requests. The Raft layer cannot push commitIndex, making the nodes unable to synchronize and creating a live lock. The solution is simply to create another thread to monitor in real time whether the latest data in the rf.log is its term, and if not, submit an Empty entry to push the whole raft.