MIT6.824 Lab 2: Raft - Part 2A

2A部分要求实现Raft的Leader选举和心跳,心跳通过定时发送不包含日志的AppendEntries RPCs实现。Leader选举在选举超时的时候触发。

心跳和选举超时设计

1
2
3
4
5
const (
	ElectionTick  = 5
	HeartbeatTick = 1
	Ticker        = 100 * time.Millisecond
)
1
2
3
4
5
6
type Raft struct {
    ... 
	electionElapsed  int
	heartbeatElapsed int
	electionTimeout  int
}

Tiker是主循环的循环定时器间隔时间,electionTimeout是选举超时时间,设置为**[ElectionTick,2*ElectionTick)**。每次循环定时器超时:

  • Leader:对heartbeatElapsed加1,如果heartbeatElapsed大于等于HeartbeatTick,发送心跳RPC
  • Candidate和Follower:对electionElapsed加1,如果electionElapsed大于等于electionTimeout则说明选举超时,需发送投票RPC

这样的设计有参考raft-structure.txt,只需要一个循环定时器,避免不断对定时器进行重置。在心跳超时的时候重置heartbeatElapsed为0,选举超时的时候重置electionElapsed为0,并重新随机electionTimeout即可。

选举超时设置
1
2
3
func (rf *Raft) resetElectionTimeout() {
	rf.electionTimeout = ElectionTick + rand.Intn(ElectionTick)
}
选举超时判断条件
1
2
3
func (rf *Raft) pastElectionTimeout() bool {
	return rf.electionElapsed >= rf.electionTimeout
}
检测是否选举超时
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (rf *Raft) tickElection() {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	rf.electionElapsed++
	if rf.pastElectionTimeout() {
		switch rf.state {
		case Follower, Candidate:
			rf.becomeCandidate()
		}
	}
}
心跳超时判断条件
1
2
3
func (rf *Raft) pastHeartbeat() bool {
	return rf.heartbeatElapsed >= HeartbeatTick
}
检测是否心跳超时
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (rf *Raft) tickHeartbeat() {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	rf.heartbeatElapsed++
	if rf.pastHeartbeat() {
		if rf.state == Leader {
			rf.heartbeatElapsed = 0
			rf.broadcastHeartbeat()
		}
	}
}

主循环设计

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func eventLoop(rf *Raft) {
	ticker := time.NewTicker(Ticker)
	defer ticker.Stop()
	for !rf.killed() {
		select {
		case <-ticker.C:
		}
		switch rf.GetCurState() {
		case Follower, Candidate:
			rf.tickElection()
		case Leader:
			rf.tickHeartbeat()
		}
	}
}

主循环使用一个goroutine进行执行,创建一个循环定时器tiker,每次定时器超时的时候都根据节点的状态进行相应的超时检测。如果超时,Leader会发送心跳,Follower和Candidate则会增加Term,并发起投票进行选举。

心跳实现

AppendEntries RPCs定义
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type AppendEntriesArgs struct {
	Term         int        //leader’s term
	LeaderId     int        //so follower can redirect clients
	PreLogIndex  int        //index of log entry immediately preceding new ones
	PreLogTerm   int        //term of prevLogIndex entry
	Entries      []LogEntry //log entries to store (empty for heartbeat; may send more than one for efficiency)
	LeaderCommit int        //leader’s commitIndex
}

type AppendEntriesReply struct {
	Term    int  //currentTerm, for leader to update itself
	Success bool //true if follower contained entry matching prevLogIndex and prevLogTerm
}
Leader发送心跳RPCs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (rf *Raft) broadcastHeartbeat() {
	for i := 0; i < len(rf.peers); i++ {
		if i == rf.me {
			continue
		}
		go func(server int) {
			// maybe peer already is not leader
			rf.mu.Lock()
			if rf.state != Leader {
				rf.mu.Unlock()
				return
			}
			args := &AppendEntriesArgs{
				Term:     rf.currentTerm,
				LeaderId: rf.me,
			}
			rf.mu.Unlock()
			var reply AppendEntriesReply
			if !rf.sendAppendEntries(server, args, &reply) {
				return
			}

			rf.mu.Lock()
			defer rf.mu.Unlock()
			if reply.Term > rf.currentTerm {
				rf.becomeFollower(reply.Term)
			}
		}(i)
	}
}

Leader对所有节点发送心跳,每个心跳RPC发送都使用单独的goroutine,这样不会阻塞主goroutine。在发送心跳的时候先检测是不是Leader,有可能发送中途就已经不是Leader了。需对PRC响应包进行处理,如果响应包的Term比自己的Term大,那么立即转换为Follower状态。

接收心跳
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if rf.currentTerm > args.Term {
		reply.Term = rf.currentTerm
		return
	}

	rf.becomeFollower(args.Term)
	reply.Success = true
	return
}

接收者需要对自己的Term和PPC的Term进行比较,如果自己的大,那么直接拒绝后续执行并返回。否则重置选举超时并更新自己的Term,这里简化成转换为Follower状态,因为Leader或者Candidate接收到心跳也需要转变为Follower。

选举实现

投票RPCs定义
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	Term         int //candidate’s term
	CandidateId  int //candidate requesting vote
	LastLogIndex int //index of candidate’s last log entry
	LastLogTerm  int //term of candidate’s last log entry
}

type RequestVoteReply struct {
	// Your data here (2A).
	Term        int  //currentTerm, for candidate to update itself
	VoteGranted bool //true means candidate received vote
}
转变为Candidate
1
2
3
4
5
6
7
func (rf *Raft) becomeCandidate() {
	rf.state = Candidate
	rf.reset(rf.currentTerm + 1)
	rf.voteFor = -1
	rf.resetElectionTimeout()
	go rf.startElection()
}

在选举定时器超时的时候转变为候选人,然后进行选举:

  • 自己的Term加1
  • 投票给自己
  • 重置选举定时器
  • 发送投票RPCs给其他节点
Candidate发送投票RPCs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (rf *Raft) startElection() {
	voteCh := make(chan RequestVoteReply, len(rf.peers))
	rf.mu.Lock()
	logLen := len(rf.log)
	voteArgs := &RequestVoteArgs{
		Term:         rf.currentTerm,
		CandidateId:  rf.me,
		LastLogIndex: rf.log[logLen-1].Index,
		LastLogTerm:  rf.log[logLen-1].Term,
	}
	rf.mu.Unlock()

	var wg sync.WaitGroup
	for i := 0; i < len(rf.peers); i++ {
		if i == rf.me {
			continue
		}
		wg.Add(1)
		go func(server int) {
			defer wg.Done()
			var reply RequestVoteReply
			if !rf.sendRequestVote(server, voteArgs, &reply) {
				return
			}
			voteCh <- reply
		}(i)
	}

	doneCh := make(chan struct{}, 1)
	go func() {
		votes := 1
		rf.mu.Lock()
		rf.voteFor = rf.me
		rf.mu.Unlock()
		for !rf.killed() {
			select {
			case reply := <-voteCh:
				rf.mu.Lock()
				if rf.state != Candidate {
					rf.mu.Unlock()
					return
				}
				if reply.Term > rf.currentTerm {
					rf.becomeFollower(reply.Term)
					rf.mu.Unlock()
					return
				}
				if reply.VoteGranted && rf.currentTerm == reply.Term {
					votes++
				}
				if rf.majority(votes) {
					rf.becomeLeader()
					rf.mu.Unlock()
					return
				}
				rf.mu.Unlock()
			case <-doneCh:
				return
			}
		}
	}()
	wg.Wait()
	doneCh <- struct{}{}
}

首先创建一个channel voteCh用来接收每个投票RPCs结果,对每个节点单独用goroutine发送投票RPCs,并把结果发送到voteCh。同时在用一个goroutine等待每个投票RPC结果返回:

  • 如果接收到超半数的票,则成功当选为Leader,转换为Leader状态,并立即结束等待投票
  • 如果已收到所有投票RPC响应,也立即结束等待投票
  • 收到的RPCs响应的Term比Candidate的Term大,则Candidate转换为Follower,并结束等待投票

运行测试的结果:

1
2
3
4
5
6
7
8
9
=== RUN   TestInitialElection2A
Test (2A): initial election ...
 ... Passed --   3.5  3   62   16400    0
--- PASS: TestInitialElection2A (3.51s)
=== RUN   TestReElection2A
Test (2A): election after network failure ...
 ... Passed --   5.0  3  122   25088    0
--- PASS: TestReElection2A (5.01s)
PASS

使用TA的批量测试脚本跑10000次的结果也都通过了