前言
之前也细读过raft协议论文,项目里也独立实现过,不过做了mit 6.824的lab2之后,才发现有很多细节没有注意到,论文里figure 2的每一句话真的都需要完全毫无偏差的实现,不然肯定有某些corner case没考虑到。本文就分析一下mit 6.824课程lab2里raft协议实现的思路和细节,以及总结一些注意事项,代码可见个人github仓库https://github.com/wjqwsp/MIT-6.824-2018 ,最新的名为“raft”的commit里,用golang实现。后续还会在这个仓库里不断补全课程的其他实验代码。本文不会详细阐述raft的原理和细节,只关注实现和注意事项。
整体结构
本文主要针对raft论文的核心部分:leader选举以及数据复制、一致性检测与恢复两个内容的实现进行分析。
每个raft实例包含3个常驻线程,分别执行发送心跳、发起选举、apply状态机3个任务。同时每个raft实例之间用rpc方式通信,每个实例都有两个rpc回调,分别是AppendEntries和RequestVote,处理数据复制、心跳以及选举的请求。主要有以下几个结构体:raft实例,AppendEntries请求,AppendEntries回复,RequestVote请求,RequestVote回复。主要还是按照论文里figure 2的定义确定结构体包含哪些数据域。
|
|
leader选举
raft协议的Leader选举主要由term决定。term相当于逻辑时钟,时间轴按term分成长度不等的时间片段,每个term内最多只能有一个leader。如果不考虑数据复制的话,仅仅用term按照先到先得的投票逻辑就能决出一个leader。在每一个term内,每个server最多只能投出一票给某一个server,然后获得超过半数票数的server会当选为该term的leader。leader会通过心跳(heartbeat timeout)来维持自己的统治地位,当超过一定时间(election timeout)其他follower没有收到该Leader的心跳,则会转换为candidate状态,然后发起选举。如果加入了数据复制功能,那么选举逻辑就要加一点限制,不再是先到先得了,而是要看谁的log里的数据最新。因为raft协议是多数节点同步的,所以只要某个请求被commit了,只要多数节点在线,当选的leader就必然有最新commit的数据。
raft实例初始化
|
|
初始化raft结构体,然后启动3个常驻线程。raft结构体里跟选举相关的最重要的几个字段是state,currentTerm, lastHeartbeatTime,log,votedFor和persister。其中votedFor,log和currentTerm是需要持久化的。在每次raft实例启动的时候,都要读取之前持久化的状态。因为raft协议的正确性保证的必要条件之一是每个term里每个server最多只能投出一票。设想一下这个情况,如果某个server投出了一票,就挂了,然后很快重启起来,如果没有保存之前投票的状态,就可能会重新投给别的server,很可能出现多个leader的脑裂情况。log的话不用说肯定是要持久化的,后续的数据恢复、同步以及leader重选都要用到。
下面主要看一下sendHeartbeat和elect两个函数,分别处理发送心跳和发起选举两个任务。
发送心跳
|
|
该函数执行一个无限循环,定期检查一下自己是不是leader,如果是leader的话需要给其他server发送AppendEntries请求以维持自己的统治地位。这里有一个需要注意的地方。我一开始实现的时候,认为心跳请求只需要发空的entries[]就行,不过这样会导致一个问题:如果长期没有新的请求到达,那么raft实例之间就永远不会主动进行数据同步了。在论文里,明确说了如果请求超时或失败,leader是需要无限尝试的。一旦集群发生大规模的失败和重启,那么之前的请求也可能永远得不到尝试的机会。所以,我认为心跳的实现可以和普通AppendEntries请求并没有太大的区别,在正常情况下,心跳请求携带的entries[]是空的,但如果有数据丢失,就可以携带上数据,马上进行数据恢复。
这种做法有一个缺点,就是可能会有很多重复的数据传输,不过raft协议里请求写Log是幂等的,所以并不会影响正确性。还有一个解决方法,就是在复制数据的时候一旦发生请求失败则无限重试,并且在每次leader选举成功之后commit一个空的请求。这种做法应该是论文推荐的做法,也可以解决线性一致性的问题,不过要过实验的测试用例的话还是用第一种比较合适,实际实现可以用第二种。
这里对其他每一个server的rpc请求都要新开一个线程来处理,因为所有rpc请求都有可能发生阻塞,多线程也有更好的并行性。
发起选举
|
|
也是一个无限循环,每隔一个heartbeat timeout,检查一下自己的状态,判断一下最近一次收到heartbeat的时间与当前时间有没有超过election timeout。如果超过的话则要重置计时器,开始一轮新的选举。这里为了实现方便,采用定期检查状态的方式判断有没有超时,也可以用channel或者条件变量等方法来获取通知。
上面的代码主要是投票是如何触发的,下面讲一下整个投票的过程是怎么实现的,主要涉及到两个函数,一个上面代码提到的sendRequestVote函数,主要执行rpc调用并处理返回结果。还有一个函数就是RequestVote rpc调用的回调函数。
选举过程
|
|
首先要注意的是,论文里figure 2里提到reply false的,都要马上返回,不要处理后续的逻辑。对于raft里的AppendEntries和RequestVote这两个rpc请求,如果发现请求的term或者返回的term大于自己的term,说明自己已经过时了,需要更新为更大的term,并且重置投票状态,自己变为follower。在选举过程中,把server及时变为follower可以防止多个server进入candidate状态,导致分票。还要注意当投出自己的一票之后,要重置计时器。试想一下这个情景,如果不重置计时器,那么当某个server收到投票请求,自己的term更新了,投了票之后,很快又超时了,自己又成为candidate,term又加1,很可能新选出来的leader又被迫step out,那么系统长期处于选举->step out->选举的状态,永远无法稳定提供服务。
如果没有投票,则不要重置计时器。否则拥有旧数据的server如果更早进入candidate状态,那么其他拥有更新数据的server就永远没有机会发起投票了。
投票的准则是当前term自己没有投票,并且对方的日志比自己更新。判断日志更新的标准是先判断term,term大的更新,term相等的话看最后的index,index大的更新。
在收到投票结果时,要判断当前的状态与发起请求的状态是否一样。因为网络不可靠,有可能发生请求的延时,可能当收到返回的时候自己已经不是candidate了,或者term已经不一样了,这样的话肯定不能接受投票结果。
当收到超过半数的票时,把自己的状态更新为leader,同时要按照论文figure 2的要求,初始化每个server的nextIndex和matchIndex。这两个字段在后面讲实现数据复制时候再说明。
数据复制、一致性检测、同步
实现上主要分成4部分,leader处理写请求并发起复制,follower执行AppendEntries rpc调用的回调,以及leader处理AppendEntries的返回,apply已经commit的请求到状态机。需要时刻注意的是,网络不可靠,rpc请求可能会超时重传,回调函数可能收到多个相同请求,以及可能收到过时的请求,必须能够处理这类情况。
leader处理写请求并发起复制
|
|
向所有follower发送请求,请求的时候带上nextIndex以后的所有数据。nextIndex表示对follower当前log进度的一个乐观判断,初始化为leader的log长度+1,表示和leader完全一致。记得更新leader自己的nextIndex和matchIndex,matchIndex是作为请求是否commit的判断,不能把leader自己的算漏了,否则请求可能一直无法commit。
AppendEntries回调
|
|
首先判断term是否满足要求,如果对方的term比自己小,则直接拒绝请求。
如果对方的prevLogIndex和prevLogTerm和自己相应index的term完全一致,就说明该位置的entry是一致的,并且之前所有的entries都是一致的,并且都保存着相同的command。这个一致性保证是由两方面原因保证的:
- 每个term只有一个leader,而该leader只会在一个index位置上处理一个请求,并且只要term不变(即leader不倒),永远不会被修改的,所以command是唯一的。
- 每次AppendEntries都会进行一致性检测,只有一致性检测成功才会接受写请求。由于一开始日志是一致的,因此只要每次一致性检测成功,可以由数学归纳法保证前面的entries都是一致的。
要注意请求有可能顺序会乱,也可能接受多次重复的请求,又或者接收到旧的请求(全部或部分数据已经写入log了),因此不能简单的append到log的末尾,而是从PrevLogIndex+1位置开始,逐个比较是否有不一致,或者是否有重复的entry已经接收过了。顺序更后的请求由于肯定携带有前面未收到回复的请求的数据,因此不会出现数据在log里乱序的问题。容易发现,这种方法处理log是幂等的。
append结束后,如果发现leaderCommit大于自己的commitIndex,需要更新commitIndex,然后apply到状态机。注意更新commitIndex的时候要取自己log和leaderCommit的最小值,如果leaderCommit大于自己的log的最末尾index,而不能简单的把自己的commitIndex设超过自身。因为之后可能会插入一些不是同一个leader发来的entries,把commitIndex设得过大,就会错误apply。
如果在append过程中发现不一致,则要把日志截断,然后append之后的数据。
如果一致性检测失败,则应该拒绝这个rpc请求。这里实现用了论文提到的优化方法,以避免多次拒绝rpc请求,影响性能。传统的方法应该是每次拒绝让leader的prevLogIndex减1,然后再次尝试。这里引入了conflictIndex和conflictTerm两个字段:
- 如果prevLogIndex超出自己的日志末尾,那么把conflictIndex设为日志长度,conflictTerm设为空
- 如果prevLogIndex没有超出日志末尾,而是term不一样,则找出该term的第一个index,设置conflictIndex和conflictTerm,以求一次越过整个term,这样虽然可能要传输多余的entries,但大大减少了网络交互的请求数。
处理AppendEntries的返回
|
|
首先注意请求的term和收到返回的term是否一致,否则代表该请求已过时,直接忽略。
如果请求不成功,说明一致性检测失败。需要减少nextIndex,重发请求。前面讲过,可以通过conflictIndex和conflictTerm加速。这里先把nextIndex置为conflictTerm的最后一个entry后面的index,这样prevLogIndex就带上了conflictTerm的最后一个entry的index。如果conflictTerm不存在,则把nextIndex置为conflictIndex。通过这样的方式,每次一致性检测失败至少可以skip一个term的entries,大大减少了交互数。
如果请求成功,则要更新matchIndex。由于当请求返回的时候,可能已经插入了更多的log,又或者该请求已经过时了。不能简单的把matchIndex设为最后一个index,而是要按照请求当时带的数据,把matchIndex设为args.PrevLogIndex + len(args.Entries)。
记当前请求更新后的matchIndex为N,遍历所有server,如果大于半数的matchIndex大于N,则更新commitIndex。这里还有一个限制条件,不能commit小于当前term的entries。这个原因论文里已经说得很清楚了,过时的entries有风险被覆盖,导致被错误apply。
apply状态机
|
|
所有被commit的entry都可以被apply到状态机里。这里的apply简单地用message发送到channel来表示,是一个异步的过程。
raft协议对状态机的实现没有明确要求,因此不一定是幂等的,所以相同的entry只能被apply一次。实际可以根据具体状态机的性质对apply的严格性进行调整。