MIT6.824 Lab 2: Raft - Part 2A
2A部分要求实现Raft的Leader选举和心跳,心跳通过定时发送不包含日志的AppendEntries RPCs实现。Leader选举在选举超时的时候触发。
心跳和选举超时设计
1
2
3
4
5
|
const (
ElectionTick = 5
HeartbeatTick = 1
Ticker = 100 * time.Millisecond
)
|
1
2
3
4
5
6
|
type Raft struct {
...
electionElapsed int
heartbeatElapsed int
electionTimeout int
}
|
Tiker是主循环的循环定时器间隔时间,electionTimeout是选举超时时间,设置为**[ElectionTick,2*ElectionTick)**。每次循环定时器超时:
- Leader:对heartbeatElapsed加1,如果heartbeatElapsed大于等于HeartbeatTick,发送心跳RPC
- Candidate和Follower:对electionElapsed加1,如果electionElapsed大于等于electionTimeout则说明选举超时,需发送投票RPC
这样的设计有参考raft-structure.txt,只需要一个循环定时器,避免不断对定时器进行重置。在心跳超时的时候重置heartbeatElapsed为0,选举超时的时候重置electionElapsed为0,并重新随机electionTimeout即可。
选举超时设置
1
2
3
|
func (rf *Raft) resetElectionTimeout() {
rf.electionTimeout = ElectionTick + rand.Intn(ElectionTick)
}
|
选举超时判断条件
1
2
3
|
func (rf *Raft) pastElectionTimeout() bool {
return rf.electionElapsed >= rf.electionTimeout
}
|
检测是否选举超时
1
2
3
4
5
6
7
8
9
10
11
12
|
func (rf *Raft) tickElection() {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.electionElapsed++
if rf.pastElectionTimeout() {
switch rf.state {
case Follower, Candidate:
rf.becomeCandidate()
}
}
}
|
心跳超时判断条件
1
2
3
|
func (rf *Raft) pastHeartbeat() bool {
return rf.heartbeatElapsed >= HeartbeatTick
}
|
检测是否心跳超时
1
2
3
4
5
6
7
8
9
10
11
12
|
func (rf *Raft) tickHeartbeat() {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.heartbeatElapsed++
if rf.pastHeartbeat() {
if rf.state == Leader {
rf.heartbeatElapsed = 0
rf.broadcastHeartbeat()
}
}
}
|
主循环设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func eventLoop(rf *Raft) {
ticker := time.NewTicker(Ticker)
defer ticker.Stop()
for !rf.killed() {
select {
case <-ticker.C:
}
switch rf.GetCurState() {
case Follower, Candidate:
rf.tickElection()
case Leader:
rf.tickHeartbeat()
}
}
}
|
主循环使用一个goroutine进行执行,创建一个循环定时器tiker,每次定时器超时的时候都根据节点的状态进行相应的超时检测。如果超时,Leader会发送心跳,Follower和Candidate则会增加Term,并发起投票进行选举。
心跳实现
AppendEntries RPCs定义
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type AppendEntriesArgs struct {
Term int //leader’s term
LeaderId int //so follower can redirect clients
PreLogIndex int //index of log entry immediately preceding new ones
PreLogTerm int //term of prevLogIndex entry
Entries []LogEntry //log entries to store (empty for heartbeat; may send more than one for efficiency)
LeaderCommit int //leader’s commitIndex
}
type AppendEntriesReply struct {
Term int //currentTerm, for leader to update itself
Success bool //true if follower contained entry matching prevLogIndex and prevLogTerm
}
|
Leader发送心跳RPCs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func (rf *Raft) broadcastHeartbeat() {
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(server int) {
// maybe peer already is not leader
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
}
rf.mu.Unlock()
var reply AppendEntriesReply
if !rf.sendAppendEntries(server, args, &reply) {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.becomeFollower(reply.Term)
}
}(i)
}
}
|
Leader对所有节点发送心跳,每个心跳RPC发送都使用单独的goroutine,这样不会阻塞主goroutine。在发送心跳的时候先检测是不是Leader,有可能发送中途就已经不是Leader了。需对PRC响应包进行处理,如果响应包的Term比自己的Term大,那么立即转换为Follower状态。
接收心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.currentTerm > args.Term {
reply.Term = rf.currentTerm
return
}
rf.becomeFollower(args.Term)
reply.Success = true
return
}
|
接收者需要对自己的Term和PPC的Term进行比较,如果自己的大,那么直接拒绝后续执行并返回。否则重置选举超时并更新自己的Term,这里简化成转换为Follower状态,因为Leader或者Candidate接收到心跳也需要转变为Follower。
选举实现
投票RPCs定义
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int //candidate’s term
CandidateId int //candidate requesting vote
LastLogIndex int //index of candidate’s last log entry
LastLogTerm int //term of candidate’s last log entry
}
type RequestVoteReply struct {
// Your data here (2A).
Term int //currentTerm, for candidate to update itself
VoteGranted bool //true means candidate received vote
}
|
转变为Candidate
1
2
3
4
5
6
7
|
func (rf *Raft) becomeCandidate() {
rf.state = Candidate
rf.reset(rf.currentTerm + 1)
rf.voteFor = -1
rf.resetElectionTimeout()
go rf.startElection()
}
|
在选举定时器超时的时候转变为候选人,然后进行选举:
- 自己的Term加1
- 投票给自己
- 重置选举定时器
- 发送投票RPCs给其他节点
Candidate发送投票RPCs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
func (rf *Raft) startElection() {
voteCh := make(chan RequestVoteReply, len(rf.peers))
rf.mu.Lock()
logLen := len(rf.log)
voteArgs := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: rf.log[logLen-1].Index,
LastLogTerm: rf.log[logLen-1].Term,
}
rf.mu.Unlock()
var wg sync.WaitGroup
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
wg.Add(1)
go func(server int) {
defer wg.Done()
var reply RequestVoteReply
if !rf.sendRequestVote(server, voteArgs, &reply) {
return
}
voteCh <- reply
}(i)
}
doneCh := make(chan struct{}, 1)
go func() {
votes := 1
rf.mu.Lock()
rf.voteFor = rf.me
rf.mu.Unlock()
for !rf.killed() {
select {
case reply := <-voteCh:
rf.mu.Lock()
if rf.state != Candidate {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm {
rf.becomeFollower(reply.Term)
rf.mu.Unlock()
return
}
if reply.VoteGranted && rf.currentTerm == reply.Term {
votes++
}
if rf.majority(votes) {
rf.becomeLeader()
rf.mu.Unlock()
return
}
rf.mu.Unlock()
case <-doneCh:
return
}
}
}()
wg.Wait()
doneCh <- struct{}{}
}
|
首先创建一个channel voteCh用来接收每个投票RPCs结果,对每个节点单独用goroutine发送投票RPCs,并把结果发送到voteCh。同时在用一个goroutine等待每个投票RPC结果返回:
- 如果接收到超半数的票,则成功当选为Leader,转换为Leader状态,并立即结束等待投票
- 如果已收到所有投票RPC响应,也立即结束等待投票
- 收到的RPCs响应的Term比Candidate的Term大,则Candidate转换为Follower,并结束等待投票
运行测试的结果:
1
2
3
4
5
6
7
8
9
|
=== RUN TestInitialElection2A
Test (2A): initial election ...
... Passed -- 3.5 3 62 16400 0
--- PASS: TestInitialElection2A (3.51s)
=== RUN TestReElection2A
Test (2A): election after network failure ...
... Passed -- 5.0 3 122 25088 0
--- PASS: TestReElection2A (5.01s)
PASS
|
使用TA的批量测试脚本跑10000次的结果也都通过了