木鸟杂记

大规模数据系统

6.824 - Raft Implementation (Part 2): Log Replication

Preface

After finishing lab2a, i.e., Raft leader election last time, I was stuck on log replication for a long time. Until last night, when I optimized appendEntries (skipping all log entries in the current term when prevLog doesn’t match), the long-troubling TestBackup2B magically passed. I ran it twice and still couldn’t quite believe it, so I deliberately changed it back and saw it fail as expected before I felt relieved — it seems the efficiency was too low and timed out.

While it’s still fresh, I might as well write down the blood, sweat, and tears of this period tonight.

Author: Muniao’s Notes https://www.qtmuniao.com, please indicate the source when reprinting

Lab Overview

6.824 is a distributed systems course at MIT. I followed the 2018 spring version. The second lab requires a simple implementation of a distributed consensus protocol — Raft.

This is a protocol designed specifically for ease of teaching and engineering implementation. It breaks the protocol down into several relatively independent modules — leader election, log replication, and safety guarantees. Figure 2 in the paper basically gives all the implementation details of Raft, every word is precious. But because it is so concise and profound, some state transitions are scattered across different descriptions. If you truly implement it just by following this figure, it’s easy to miss some details.

Log Replication

This is the content of lab2B. After going back and forth, hitting pitfalls, clearing mines, giving up, and picking it back up for over a month, I finally completed the log replication part. The final code is less than seven hundred lines (694), and some implementation details were even inspired by online resources. The mental state during this period was similar to a comic I saw online making fun of programmers — ah, a bug, why? fix fix fix; still wrong, fix fix fix; ah finally correct, but then, why?

I feel the final passing version still needs improvement, and some places vaguely feel a bit redundant. But let’s be happy for now and get these thoughts down on paper, after all, the memory in this old person’s brain is quite limited and volatile.

The previous post on leader election mainly recorded some implementation tips; this post mainly explores the various logical details of the implementation.

General Flow

At first, I more or less understood the paper and felt I had grasped all the details, but the whole process still couldn’t run in my head. Now looking back, after a Leader comes online, the log replication process goes roughly like this:

  1. After the Leader is elected, it initializes, mainly the nextIndex array and matchIndex array. There may be many approaches, but this is what I did: all nextIndex = len(rf.log), all matchIndex = 0
  2. Then immediately start heartbeats, i.e., AppendEntries. The paper says sending an empty heartbeat is sufficient at the start, but in my implementation, I sent a heartbeat with one logEntry. This depends on my initialization method above and the parameter construction strategy below.
  3. I divided the synchronization process into a probing phase for the match position and a transfer phase after the match; that is, first matching a certain logEntry in the Follower by advancing prevLogIndex+prevLogTerm each time, then the Leader sends all entries after the matched logEntry to the Follower in one go.
  4. For each parameter construction, the other fields are relatively certain. The main considerations are three fields: prevLogIndex, prevLogTerm, and entries. Since prevLogTerm is determined by prevLogIndex, the main considerations are prevLogIndex and entries. For prevLogIndex, in the probing phase I set it to nextIndex-1, which reduces unnecessary entries transmission during probing; in the transfer phase, prevLogIndex can be set to matchIndex, i.e., the last matched entry. For entries, it takes a closed interval from the Leader’s log of [prevIndex+1, min(nextIndex, len(rf.log)-1)].
  5. Thus, no data is transferred during each probe (i.e., [], because at this time prevLogIndex = nextIndex-1); in the transfer phase, all logEntries on the Leader after the match can be transferred in one go. If there are no new logs, several variables will remain: nextIndex = len(rf.log); prevLogIndex = matchIndex = len(rf.log)-1, entries is [];

Other points are some details attached to this process:

  1. Periodically check the majority logEntry match Index to decide whether to commit, i.e., advance the Leader’s commitIndex, and synchronize it to each Follower in subsequent AppendEntries RPCs.
  2. At the same time, another thread needs to check whether lastApplied has caught up with commitIndex, ensuring that committed logs are promptly applied to the state machine. I think these two variables are separated mainly for logical decoupling — on the Leader, the Leader actively checks to update commitIndex; on the Follower, it passively accepts Leader messages to update.
  3. After the Leader accepts a new command, it needs to update its own corresponding position in the matchIndex array, because it itself also counts as one vote when finally calculating the majority.
  4. After a Follower disconnects, the Leader needs to promptly stop executing the callback content.

AppendEntries Heartbeat & Sync

This mainly corresponds to the two phases mentioned above, the probing phase and the transfer phase; other things to note are the locking and state self-check mentioned in the previous post.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// need handle the reply
if reply.Term > rf.currentTerm {
rf.becomeFollower(reply.Term)
} else {
if !reply.Success {
// roll back per term every time
nextIndex := rf.nextIndex[server] - 1
for nextIndex > 1 && rf.log[nextIndex].Term == args.PrevLogTerm {
nextIndex--
}
rf.nextIndex[server] = nextIndex

} else {
// if match, sync all after
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = len(rf.log)
}
}

Then comes parameter construction. For prevIndex, it is also constructed differently in the two phases. Then take the appropriate window of entries.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rf *Raft) constructAppendEntriesArg(idx int) *AppendEntriesArgs {
prevLogIndex := 0
if rf.matchIndex[idx] == 0 {
prevLogIndex = rf.nextIndex[idx] - 1 // try to find a match
} else {
prevLogIndex = rf.matchIndex[idx] // after match to sync
}
prevLogTerm := rf.log[prevLogIndex].Term

// the need to replica window [prevLogIndex+1, nextIndex)
var entries []*LogEntry
start := prevLogIndex + 1
end := min(rf.nextIndex[idx], len(rf.log)-1)
for i := start; i <= end; i++ {
entries = append(entries, rf.log[i])
}

return &AppendEntriesArgs{rf.currentTerm, rf.me, prevLogIndex,
prevLogTerm, entries, rf.commitIndex}
}

Finally, the AppendLogEntries callback: when seeing a request from a previous term, reject it directly — nothing much to say here, just remember to bring back your own term. In addition, regardless of whether args.Term > or = rf.currentTerm, the peer receiving AppendEntries must become a Follower and reset the timer. Also, when matching logEntry, pay attention to truncation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
// reject the append entries
if args.Term < rf.currentTerm {
reply.Success = false
return
}

rf.leaderId = args.LeaderId
rf.becomeFollower(args.Term)

if args.PrevLogIndex >= len(rf.log) {
reply.Success = false
} else if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
rf.log = rf.log[:args.PrevLogIndex]
} else {
reply.Success = true

// delete not match and append new ones
if len(args.Entries) > 0 {
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
}

// commit index
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
}
}
}

CommitIndex Update

This mainly relies on the leader’s matchIndex[]. The specific approach is to sort it in ascending order and take the index at the median position (I came up with this on the spot at the time, and it felt magical). Another important point emphasized in the paper is that only logEntries from the current Term can be committed. This is to prevent repeated overwriting caused by Leaders coming and going.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rf *Raft) checkCommitIndex() {
peersCount := len(rf.peers)
matchIndexList := make([]int, peersCount)
copy(matchIndexList, rf.matchIndex)
sort.Ints(matchIndexList)

// match index before the "majority" are all matched by majority peers
// before we inc commitIndex, we must check if its term match currentTerm
majority := peersCount / 2
peerMatchIndex := matchIndexList[majority]
if peerMatchIndex > rf.commitIndex && rf.log[peerMatchIndex].Term == rf.currentTerm {
rf.commitIndex = peerMatchIndex
}
}

Responding to Votes

Each peer can cast at most one vote per term, but after each term update, votedFor can be set to -1, meaning it can vote again. This case occurs when responding to a candidate’s request for a vote: if it has already voted, it seems it cannot vote, but if it finds its term is not as large as the candidate’s, then it needs to immediately become a Follower and vote for that candidate.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm

// once find a peer with higher term, follow
if args.Term > rf.currentTerm {
rf.becomeFollower(args.Term)
}

// compare term and test if it voted
if args.Term < rf.currentTerm || rf.votedFor != -1 {
reply.VotedGranted = false
return
}

// compare the last log entry
lastIndex := len(rf.log) - 1
lastLogTerm := rf.log[lastIndex].Term
if args.LastLogTerm > lastLogTerm ||
args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex {
reply.VotedGranted = true

// convert to follower
rf.becomeFollower(args.Term)
rf.votedFor = args.CandidateId // do not forget
} else {
reply.VotedGranted = false
}
}

This involves the implementation of becomeFollower, i.e., changing the state, then resetting the timer, and deciding whether it can vote based on whether the term was updated.

1
2
3
4
5
6
7
8
9
func (rf *Raft) becomeFollower(term int) {
DPrintf("%d[%d] become follower", rf.me, term)
rf.resetElectionTimer()
rf.state = Follower
if term > rf.currentTerm {
rf.votedFor = -1
}
rf.currentTerm = term
}

我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg