raft协议实现分析

前言

  之前也细读过raft协议论文,项目里也独立实现过,不过做了mit 6.824的lab2之后,才发现有很多细节没有注意到,论文里figure 2的每一句话真的都需要完全毫无偏差的实现,不然肯定有某些corner case没考虑到。本文就分析一下mit 6.824课程lab2里raft协议实现的思路和细节,以及总结一些注意事项,代码可见个人github仓库https://github.com/wjqwsp/MIT-6.824-2018 ,最新的名为“raft”的commit里,用golang实现。后续还会在这个仓库里不断补全课程的其他实验代码。本文不会详细阐述raft的原理和细节,只关注实现和注意事项。

整体结构

  本文主要针对raft论文的核心部分:leader选举以及数据复制、一致性检测与恢复两个内容的实现进行分析。

  每个raft实例包含3个常驻线程,分别执行发送心跳、发起选举、apply状态机3个任务。同时每个raft实例之间用rpc方式通信,每个实例都有两个rpc回调,分别是AppendEntries和RequestVote,处理数据复制、心跳以及选举的请求。主要有以下几个结构体:raft实例,AppendEntries请求,AppendEntries回复,RequestVote请求,RequestVote回复。主要还是按照论文里figure 2的定义确定结构体包含哪些数据域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
state raftState
currentTerm int
votedFor int
votesNums int
leaderId int
lastHeatbeatTime int64
log []Entry
commitIndex int
lastApplied int
nextIndex []int
matchIndex []int
needApplyChan chan needApplyRange
applyChan chan ApplyMsg
debug int
}
type raftState string
const (
candidate raftState = "candidate"
follower = "follower"
leader = "leader"
//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
// Your data here (2A).
Term int
VoteGranted bool
}
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []Entry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
ConflictTerm int
ConflictIndex int
}
type Entry struct {
Command interface{}
Term int
}

leader选举

  raft协议的Leader选举主要由term决定。term相当于逻辑时钟,时间轴按term分成长度不等的时间片段,每个term内最多只能有一个leader。如果不考虑数据复制的话,仅仅用term按照先到先得的投票逻辑就能决出一个leader。在每一个term内,每个server最多只能投出一票给某一个server,然后获得超过半数票数的server会当选为该term的leader。leader会通过心跳(heartbeat timeout)来维持自己的统治地位,当超过一定时间(election timeout)其他follower没有收到该Leader的心跳,则会转换为candidate状态,然后发起选举。如果加入了数据复制功能,那么选举逻辑就要加一点限制,不再是先到先得了,而是要看谁的log里的数据最新。因为raft协议是多数节点同步的,所以只要某个请求被commit了,只要多数节点在线,当选的leader就必然有最新commit的数据。

raft实例初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rf.debug = Debug
rf.applyChan = applyCh
rf.needApplyChan = make(chan needApplyRange)
rf.state = follower
rf.lastHeartbeatTime = time.Now().UnixNano()
rf.votedFor = -1
rf.log = append(rf.log, Entry{Term: 0})
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = 1
rf.matchIndex[i] = 0
}
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
go rf.sendHeartbeat()
go rf.elect()
go rf.apply()
rf.DPrintf("server %v state %+v\n\n", rf.me, rf)
return rf
}

  初始化raft结构体,然后启动3个常驻线程。raft结构体里跟选举相关的最重要的几个字段是state,currentTerm, lastHeartbeatTime,log,votedFor和persister。其中votedFor,log和currentTerm是需要持久化的。在每次raft实例启动的时候,都要读取之前持久化的状态。因为raft协议的正确性保证的必要条件之一是每个term里每个server最多只能投出一票。设想一下这个情况,如果某个server投出了一票,就挂了,然后很快重启起来,如果没有保存之前投票的状态,就可能会重新投给别的server,很可能出现多个leader的脑裂情况。log的话不用说肯定是要持久化的,后续的数据恢复、同步以及leader重选都要用到。

  下面主要看一下sendHeartbeat和elect两个函数,分别处理发送心跳和发起选举两个任务。

发送心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rf *Raft) sendHeartbeat() {
for {
currentTerm, isleader := rf.GetState()
rf.mu.Lock()
if isleader {
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
rf.DPrintf("server %v prevlogindex %v\n\n", i, rf.nextIndex[i] - 1)
var entries []Entry
if rf.nextIndex[i] > len(rf.log) - 1 {
entries = []Entry{}
}else {
entries = rf.log[rf.nextIndex[i]:]
}
appendEntriesArgs := AppendEntriesArgs{Term: currentTerm, LeaderId: rf.me, Entries: entries,
PrevLogTerm: rf.log[rf.nextIndex[i] - 1].Term, PrevLogIndex: rf.nextIndex[i] - 1, LeaderCommit: rf.commitIndex}
appendEntriesReply := new(AppendEntriesReply)
rf.DPrintf("send heartbeat req: %+v from %v to %v, current state %+v\n\n", appendEntriesArgs, rf.me, i, rf)
go rf.sendAppendEntries(i, &appendEntriesArgs, appendEntriesReply)
}
}
}
rf.mu.Unlock()
time.Sleep(time.Duration(heartbeatTimeout) * time.Millisecond)
}
}

  该函数执行一个无限循环,定期检查一下自己是不是leader,如果是leader的话需要给其他server发送AppendEntries请求以维持自己的统治地位。这里有一个需要注意的地方。我一开始实现的时候,认为心跳请求只需要发空的entries[]就行,不过这样会导致一个问题:如果长期没有新的请求到达,那么raft实例之间就永远不会主动进行数据同步了。在论文里,明确说了如果请求超时或失败,leader是需要无限尝试的。一旦集群发生大规模的失败和重启,那么之前的请求也可能永远得不到尝试的机会。所以,我认为心跳的实现可以和普通AppendEntries请求并没有太大的区别,在正常情况下,心跳请求携带的entries[]是空的,但如果有数据丢失,就可以携带上数据,马上进行数据恢复。

  这种做法有一个缺点,就是可能会有很多重复的数据传输,不过raft协议里请求写Log是幂等的,所以并不会影响正确性。还有一个解决方法,就是在复制数据的时候一旦发生请求失败则无限重试,并且在每次leader选举成功之后commit一个空的请求。这种做法应该是论文推荐的做法,也可以解决线性一致性的问题,不过要过实验的测试用例的话还是用第一种比较合适,实际实现可以用第二种。

  这里对其他每一个server的rpc请求都要新开一个线程来处理,因为所有rpc请求都有可能发生阻塞,多线程也有更好的并行性。

发起选举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (rf *Raft) elect() {
rand.Seed(time.Now().UnixNano())
for {
electionTimeout := rand.Int63n(maxElectionTimeout-minElectionTimeout) + minElectionTimeout
rf.mu.Lock()
for rf.state == follower {
currentTime := time.Now().UnixNano()
if currentTime - rf.lastHeartbeatTime > electionTimeout * 1000000 {
rf.state = candidate
} else {
rf.mu.Unlock()
time.Sleep(time.Duration(heartbeatTimeout) * time.Millisecond)
rf.mu.Lock()
}
}
for rf.state == candidate {
currentTime := time.Now().UnixNano()
if currentTime - rf.lastHeartbeatTime > electionTimeout * 1000000 {
rf.currentTerm++
rf.votedFor = rf.me
rf.votesNums = 1
rf.lastHeartbeatTime = time.Now().UnixNano()
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
requestVoteArgs := RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me,
LastLogTerm: rf.log[len(rf.log) - 1].Term, LastLogIndex: len(rf.log) - 1}
rf.DPrintf("send request vote req: %+v from %v to %v, current state %+v\n\n", requestVoteArgs, rf.me, i, rf)
requestVoteReply := new(RequestVoteReply)
go rf.sendRequestVote(i, &requestVoteArgs, requestVoteReply)
}
}
break
}
rf.mu.Unlock()
time.Sleep(time.Duration(heartbeatTimeout) * time.Millisecond)
rf.mu.Lock()
}
rf.mu.Unlock()
time.Sleep(time.Duration(heartbeatTimeout) * time.Millisecond)
}
}

  也是一个无限循环,每隔一个heartbeat timeout,检查一下自己的状态,判断一下最近一次收到heartbeat的时间与当前时间有没有超过election timeout。如果超过的话则要重置计时器,开始一轮新的选举。这里为了实现方便,采用定期检查状态的方式判断有没有超时,也可以用channel或者条件变量等方法来获取通知。

  上面的代码主要是投票是如何触发的,下面讲一下整个投票的过程是怎么实现的,主要涉及到两个函数,一个上面代码提到的sendRequestVote函数,主要执行rpc调用并处理返回结果。还有一个函数就是RequestVote rpc调用的回调函数。

选举过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
if args.Term < rf.currentTerm {
reply.VoteGranted = false
reply.Term = rf.currentTerm
rf.mu.Unlock()
return
} else if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votesNums = 0
rf.state = follower
rf.votedFor = -1
rf.persist()
}
if (args.LastLogTerm > (rf.log)[len(rf.log) - 1].Term ||
(args.LastLogTerm == (rf.log)[len(rf.log) - 1].Term && args.LastLogIndex >= len(rf.log) - 1)) &&
(rf.votedFor == -1 || rf.votedFor == args.CandidateId) {
reply.VoteGranted = true
rf.lastHeartbeatTime = time.Now().UnixNano()
rf.votedFor = args.CandidateId
rf.persist()
} else {
reply.VoteGranted = false
}
reply.Term = rf.currentTerm
rf.DPrintf("request vote reply: %+v from %v to %v, current state %+v\n\n", *reply, rf.me, args.CandidateId, rf)
rf.mu.Unlock()
}
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
rf.mu.Lock()
//DPrintf("reply vote res %v, request term %v, current term %v, current vote num %v, server id %v, state %v",
// reply.VoteGranted, args.Term, rf.currentTerm, rf.votesNums, rf.me, rf.state)
if reply.VoteGranted && args.Term == rf.currentTerm && rf.state == candidate {
rf.votesNums++
if rf.votesNums > len(rf.peers) / 2 {
rf.state = leader
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = len(rf.log)
rf.matchIndex[i] = 0
}
}
}
if reply.Term > rf.currentTerm {
rf.state = follower
rf.currentTerm = reply.Term
rf.votesNums = 0
rf.votedFor = -1
rf.persist()
}
rf.mu.Unlock()
return ok
}

  首先要注意的是,论文里figure 2里提到reply false的,都要马上返回,不要处理后续的逻辑。对于raft里的AppendEntries和RequestVote这两个rpc请求,如果发现请求的term或者返回的term大于自己的term,说明自己已经过时了,需要更新为更大的term,并且重置投票状态,自己变为follower。在选举过程中,把server及时变为follower可以防止多个server进入candidate状态,导致分票。还要注意当投出自己的一票之后,要重置计时器。试想一下这个情景,如果不重置计时器,那么当某个server收到投票请求,自己的term更新了,投了票之后,很快又超时了,自己又成为candidate,term又加1,很可能新选出来的leader又被迫step out,那么系统长期处于选举->step out->选举的状态,永远无法稳定提供服务。

  如果没有投票,则不要重置计时器。否则拥有旧数据的server如果更早进入candidate状态,那么其他拥有更新数据的server就永远没有机会发起投票了。

  投票的准则是当前term自己没有投票,并且对方的日志比自己更新。判断日志更新的标准是先判断term,term大的更新,term相等的话看最后的index,index大的更新。

  在收到投票结果时,要判断当前的状态与发起请求的状态是否一样。因为网络不可靠,有可能发生请求的延时,可能当收到返回的时候自己已经不是candidate了,或者term已经不一样了,这样的话肯定不能接受投票结果。

  当收到超过半数的票时,把自己的状态更新为leader,同时要按照论文figure 2的要求,初始化每个server的nextIndex和matchIndex。这两个字段在后面讲实现数据复制时候再说明。

数据复制、一致性检测、同步

  实现上主要分成4部分,leader处理写请求并发起复制,follower执行AppendEntries rpc调用的回调,以及leader处理AppendEntries的返回,apply已经commit的请求到状态机。需要时刻注意的是,网络不可靠,rpc请求可能会超时重传,回调函数可能收到多个相同请求,以及可能收到过时的请求,必须能够处理这类情况。

leader处理写请求并发起复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (2B).
rf.mu.Lock()
index, term, isLeader = len(rf.log), rf.currentTerm, rf.state == leader
if isLeader {
entry := Entry{Term: term, Command: command}
rf.log = append(rf.log, entry)
rf.persist()
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
args := new(AppendEntriesArgs)
args.Term = rf.currentTerm
args.PrevLogIndex = rf.nextIndex[i] - 1
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
args.LeaderCommit = rf.commitIndex
args.LeaderId = rf.me
args.Entries = rf.log[rf.nextIndex[i]:]
rf.matchIndex[rf.me] = index
rf.nextIndex[rf.me] = index + 1
rf.DPrintf("append entries req: %+v from %v to %v, current state %+v\n\n", args, rf.me, i, rf)
go rf.sendAppendEntries(i, args, &AppendEntriesReply{})
}
}
}
rf.mu.Unlock()
return index, term, isLeader
}

  向所有follower发送请求,请求的时候带上nextIndex以后的所有数据。nextIndex表示对follower当前log进度的一个乐观判断,初始化为leader的log长度+1,表示和leader完全一致。记得更新leader自己的nextIndex和matchIndex,matchIndex是作为请求是否commit的判断,不能把leader自己的算漏了,否则请求可能一直无法commit。

AppendEntries回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
if args.Term < rf.currentTerm {
reply.Success = false
reply.Term = rf.currentTerm
}else{
rf.currentTerm = args.Term
rf.leaderId = args.LeaderId
rf.state = follower
rf.lastHeartbeatTime = time.Now().UnixNano()
rf.votedFor = -1
reply.Term = rf.currentTerm
rf.persist()
if len(rf.log) - 1 < args.PrevLogIndex || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
if len(rf.log) - 1 < args.PrevLogIndex {
reply.ConflictTerm = -1
reply.ConflictIndex = len(rf.log)
}else {
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
var conflictIndex int
for conflictIndex = args.PrevLogIndex; conflictIndex >= 0 && rf.log[conflictIndex].Term == reply.ConflictTerm;
conflictIndex-- {}
reply.ConflictIndex = conflictIndex + 1
}
}else {
reply.Success = true
if len(rf.log) - 1 > args.PrevLogIndex {
for i := 1; i <= len(args.Entries); i++ {
if i + args.PrevLogIndex < len(rf.log) {
if rf.log[i + args.PrevLogIndex].Term != args.Entries[i - 1].Term {
rf.log = rf.log[:(i + args.PrevLogIndex)]
rf.log = append(rf.log, args.Entries[i - 1])
}else {
continue
}
}else{
rf.log = append(rf.log, args.Entries[i - 1])
}
}
}else {
rf.log = append(rf.log, args.Entries...)
}
if len(args.Entries) > 0 {
rf.persist()
}
if args.LeaderCommit > rf.commitIndex {
if args.LeaderCommit > len(rf.log) - 1 {
rf.commitIndex = len(rf.log) - 1
}else {
rf.commitIndex = args.LeaderCommit
}
if rf.commitIndex > rf.lastApplied {
//DPrintf("apply command\n")
go func() {
applyRange := needApplyRange{start: rf.lastApplied + 1, end: rf.commitIndex}
rf.needApplyChan <- applyRange
}()
}
}
}
}
rf.DPrintf("append entry reply: %+v from %v to %v, current state %+v\n\n", *reply, rf.me, args.LeaderId, rf)
rf.mu.Unlock()
}

  首先判断term是否满足要求,如果对方的term比自己小,则直接拒绝请求。

  如果对方的prevLogIndex和prevLogTerm和自己相应index的term完全一致,就说明该位置的entry是一致的,并且之前所有的entries都是一致的,并且都保存着相同的command。这个一致性保证是由两方面原因保证的:

  1. 每个term只有一个leader,而该leader只会在一个index位置上处理一个请求,并且只要term不变(即leader不倒),永远不会被修改的,所以command是唯一的。
  2. 每次AppendEntries都会进行一致性检测,只有一致性检测成功才会接受写请求。由于一开始日志是一致的,因此只要每次一致性检测成功,可以由数学归纳法保证前面的entries都是一致的。

  要注意请求有可能顺序会乱,也可能接受多次重复的请求,又或者接收到旧的请求(全部或部分数据已经写入log了),因此不能简单的append到log的末尾,而是从PrevLogIndex+1位置开始,逐个比较是否有不一致,或者是否有重复的entry已经接收过了。顺序更后的请求由于肯定携带有前面未收到回复的请求的数据,因此不会出现数据在log里乱序的问题。容易发现,这种方法处理log是幂等的。

  append结束后,如果发现leaderCommit大于自己的commitIndex,需要更新commitIndex,然后apply到状态机。注意更新commitIndex的时候要取自己log和leaderCommit的最小值,如果leaderCommit大于自己的log的最末尾index,而不能简单的把自己的commitIndex设超过自身。因为之后可能会插入一些不是同一个leader发来的entries,把commitIndex设得过大,就会错误apply。

  如果在append过程中发现不一致,则要把日志截断,然后append之后的数据。

  如果一致性检测失败,则应该拒绝这个rpc请求。这里实现用了论文提到的优化方法,以避免多次拒绝rpc请求,影响性能。传统的方法应该是每次拒绝让leader的prevLogIndex减1,然后再次尝试。这里引入了conflictIndex和conflictTerm两个字段:

  1. 如果prevLogIndex超出自己的日志末尾,那么把conflictIndex设为日志长度,conflictTerm设为空
  2. 如果prevLogIndex没有超出日志末尾,而是term不一样,则找出该term的第一个index,设置conflictIndex和conflictTerm,以求一次越过整个term,这样虽然可能要传输多余的entries,但大大减少了网络交互的请求数。

处理AppendEntries的返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
rf.mu.Lock()
if args.Term != rf.currentTerm || !ok {
rf.mu.Unlock()
return ok
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.votesNums = 0
rf.state = follower
rf.persist()
}else if !reply.Success {
if reply.ConflictTerm == -1 {
rf.nextIndex[server] = reply.ConflictIndex
}else {
var index int
for index = args.PrevLogIndex - 1; index >= 0 && rf.log[index].Term != reply.ConflictTerm; index-- {}
if index < 0 {
rf.nextIndex[server] = reply.ConflictIndex
}else {
rf.nextIndex[server] = index + 1
}
}
args.PrevLogIndex = rf.nextIndex[server] - 1
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
args.Entries = rf.log[rf.nextIndex[server]:]
rf.DPrintf("resent append entries req: %+v from %v to %v, current state %+v\n\n", args, rf.me, server, rf)
go rf.sendAppendEntries(server, args, &AppendEntriesReply{})
}else {
if rf.matchIndex[server] < args.PrevLogIndex + len(args.Entries) {
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = rf.matchIndex[server] + 1
}
N := rf.matchIndex[server]
if N > rf.commitIndex && rf.log[N].Term == rf.currentTerm {
cnt := 0
for i := 0; i < len(rf.peers); i++ {
if rf.matchIndex[i] >= N {
cnt++
}
}
if cnt > len(rf.peers) / 2 {
rf.commitIndex = N
if rf.commitIndex > rf.lastApplied {
go func() {
applyRange := needApplyRange{start: rf.lastApplied + 1, end: rf.commitIndex}
rf.needApplyChan <- applyRange
}()
}
}
}
}
rf.mu.Unlock()
return ok
}

  首先注意请求的term和收到返回的term是否一致,否则代表该请求已过时,直接忽略。

  如果请求不成功,说明一致性检测失败。需要减少nextIndex,重发请求。前面讲过,可以通过conflictIndex和conflictTerm加速。这里先把nextIndex置为conflictTerm的最后一个entry后面的index,这样prevLogIndex就带上了conflictTerm的最后一个entry的index。如果conflictTerm不存在,则把nextIndex置为conflictIndex。通过这样的方式,每次一致性检测失败至少可以skip一个term的entries,大大减少了交互数。

  如果请求成功,则要更新matchIndex。由于当请求返回的时候,可能已经插入了更多的log,又或者该请求已经过时了。不能简单的把matchIndex设为最后一个index,而是要按照请求当时带的数据,把matchIndex设为args.PrevLogIndex + len(args.Entries)。

  记当前请求更新后的matchIndex为N,遍历所有server,如果大于半数的matchIndex大于N,则更新commitIndex。这里还有一个限制条件,不能commit小于当前term的entries。这个原因论文里已经说得很清楚了,过时的entries有风险被覆盖,导致被错误apply。

apply状态机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (rf *Raft) apply() {
for {
applyRange := <- rf.needApplyChan
msg := ApplyMsg{}
rf.mu.Lock()
lastApply := rf.lastApplied
rf.mu.Unlock()
var start int
if applyRange.start < lastApply {
start = lastApply
}else {
start = applyRange.start
}
for i := start; i <= applyRange.end; i++ {
msg.Command = rf.log[i].Command
msg.CommandIndex = i
msg.CommandValid = true
rf.applyChan <- msg
rf.mu.Lock()
rf.lastApplied = i
rf.mu.Unlock()
}
}
}

  所有被commit的entry都可以被apply到状态机里。这里的apply简单地用message发送到channel来表示,是一个异步的过程。

  raft协议对状态机的实现没有明确要求,因此不一定是幂等的,所以相同的entry只能被apply一次。实际可以根据具体状态机的性质对apply的严格性进行调整。