文章

mit6.824 Lab2 通关总结

mit6.824 Lab2 通关总结

笔者在这个学期,也就是大三下学期开始学习这门课程,本来我打算找在这学期找实习的,不过实习也不是一下子就能找到,在面试实习的时候面试官问过我有关分布式系统的内容,奈何本人之前实在没有学习过,答不上来,于是下定决心要学习这门课程。这星期收到了实习的offer,想着尽量在上班前干完lab2,花费了大概满一周终于做出来了,趁热打铁,这里记录一下自己做lab的过程和总结。

mit6.824 Lab2 Raft 实验

这篇文章我假设读者已经对Raft协议有一定的了解,在做或者尝试做这个实验,对于一些名词大家应该都是熟悉的,我就不解释了。这个实验的目的是实现一个Raft协议的分布式系统,根据论文的指引,一步步实现,最终实现一个能够通过测试的Raft系统。课程的教授和助教们都提及到figure2是最重要的,知道这一点,我们的实现的压力也会小很多,理清figure2的所有细节,按照figure2的要求实现,就能够通过所有测试。

0. 小技巧/前期准备

做这个实验的时候,我使用了一些小技巧,大部分我都在实验指导中见到,在这里我列出一些,使用这些技巧可以帮助我们更好的完成实验:

0.1 打印信息

使用Dprintf,我们可以开关调试信息,在进行测试的时候关闭它,就能节省磁盘空间和运行时间。

0.2 锁的使用

在实验中,可以为上锁和释放锁的操作写一个函数,在这个函数里面可以打印一些信息帮助我们debug死锁:

1
2
3
4
5
6
7
8
9
10
11
func (rf *Raft) lock() {
	DPrintf("%d try lock", rf.me)
	rf.mu.Lock()
	DPrintf("%d locked", rf.me)
}s
s
func (rf *Raft) unlock() {
	DPrintf("%d try unlock", rf.me)
	rf.mu.Unlock()
	DPrintf("%d unlocked", rf.me)
}

0.3 测试脚本

有一个助教写的测试脚本,我一直就在用,每一次测试都会保存log,这里推荐给大家:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/bin/bash
#
# Script for running `go test` a bunch of times, in parallel, storing the test
# output as you go, and showing a nice status output telling you how you're
# doing.
#
# Normally, you should be able to execute this script with
#
#   ./go-test-many.sh
#
# and it should do The Right Thing(tm) by default. However, it does take some
# arguments so that you can tweak it for your testing setup. To understand
# them, we should first go quickly through what exactly this script does.
#
# First, it compiles your Go program (using go test -c) to ensure that all the
# tests are run on the same codebase, and to speed up the testing. Then, it
# runs the tester some number of times. It will run some number of testers in
# parallel, and when that number of running testers has been reached, it will
# wait for the oldest one it spawned to finish before spawning another. The
# output from each test i is stored in test-$i.log and test-$i.err (STDOUT and
# STDERR respectively).
#
# The options you can specify on the command line are:
#
#   1) how many times to run the tester (defaults to 100)
#   2) how many testers to run in parallel (defaults to the number of CPUs)
#   3) which subset of the tests to run (default to all tests)
#
# 3) is simply a regex that is passed to the tester under -test.run; any tests
# matching the regex will be run.
#
# The script is smart enough to clean up after itself if you kill it
# (in-progress tests are killed, their output is discarded, and no failure
# message is printed), and will automatically continue from where it left off
# if you kill it and then start it again.
#
# By now, you know everything that happens below.
# If you still want to read the code, go ahead.

if [ $# -eq 1 ] && [ "$1" = "--help" ]; then
	echo "Usage: $0 [RUNS=100] [PARALLELISM=#cpus] [TESTPATTERN='']"
	exit 1
fi

# If the tests don't even build, don't bother. Also, this gives us a static
# tester binary for higher performance and higher reproducability.
if ! go test -c -o tester; then
	echo -e "\e[1;31mERROR: Build failed\e[0m"
	exit 1
fi

# Default to 100 runs unless otherwise specified
runs=100
if [ $# -gt 0 ]; then
	runs="$1"
fi

# Default to one tester per CPU unless otherwise specified
parallelism=$(grep -c processor /proc/cpuinfo)
if [ $# -gt 1 ]; then
	parallelism="$2"
fi

# Default to no test filtering unless otherwise specified
test=""
if [ $# -gt 2 ]; then
	test="$3"
fi

# Figure out where we left off
logs=$(find . -maxdepth 1 -name 'test-*.log' -type f -printf '.' | wc -c)
success=$(grep -E '^PASS$' test-*.log | wc -l)
((failed = logs - success))

# Finish checks the exit status of the tester with the given PID, updates the
# success/failed counters appropriately, and prints a pretty message.
finish() {
	if ! wait "$1"; then
		if command -v notify-send >/dev/null 2>&1 &&((failed == 0)); then
			notify-send -i weather-storm "Tests started failing" \
				"$(pwd)\n$(grep FAIL: -- *.log | sed -e 's/.*FAIL: / - /' -e 's/ (.*)//' | sort -u)"
		fi
		((failed += 1))
	else
		((success += 1))
	fi

	if [ "$failed" -eq 0 ]; then
		printf "\e[1;32m";
	else
		printf "\e[1;31m";
	fi

	printf "Done %03d/%d; %d ok, %d failed\n\e[0m" \
		$((success+failed)) \
		"$runs" \
		"$success" \
		"$failed"
}

waits=() # which tester PIDs are we waiting on?
is=()    # and which iteration does each one correspond to?

# Cleanup is called when the process is killed.
# It kills any remaining tests and removes their output files before exiting.
cleanup() {
	for pid in "${waits[@]}"; do
		kill "$pid"
		wait "$pid"
		rm -rf "test-${is[0]}.err" "test-${is[0]}.log"
		is=("${is[@]:1}")
	done
	exit 0
}
trap cleanup SIGHUP SIGINT SIGTERM

# Run remaining iterations (we may already have run some)
for i in $(seq "$((success+failed+1))" "$runs"); do
	# If we have already spawned the max # of testers, wait for one to
	# finish. We'll wait for the oldest one beause it's easy.
	if [[ ${#waits[@]} -eq "$parallelism" ]]; then
		finish "${waits[0]}"
		waits=("${waits[@]:1}") # this funky syntax removes the first
		is=("${is[@]:1}")       # element from the array
	fi

	# Store this tester's iteration index
	# It's important that this happens before appending to waits(),
	# otherwise we could get an out-of-bounds in cleanup()
	is=("${is[@]}" $i)

	# Run the tester, passing -test.run if necessary
	if [[ -z "$test" ]]; then
		./tester -test.v 2> "test-${i}.err" > "test-${i}.log" &
		pid=$!
	else
		./tester -test.run "$test" -test.v 2> "test-${i}.err" > "test-${i}.log" &
		pid=$!
	fi

	# Remember the tester's PID so we can wait on it later
	waits=("${waits[@]}" $pid)
done

# Wait for remaining testers
for pid in "${waits[@]}"; do
	finish "$pid"
done

if ((failed>0)); then
	exit 1
fi
exit 0

1. Leader election

这个部分要实现Leader election,有关选举的内容在论文中有详细的描述,这里就不再赘述了。这个部分有几个细节是可以讨论的:

1.1 选举超时时间

在做到2C的时候,测试总是出错,后来发现是因为选举超时时间设置的太短了,导致选举一直在进行,AppendEntries RPC还没来得及发几个,选举就开始了,后面看了别人的文章,设置长了一点,总算能通过2C,我的选举超时时间设置成了500-550ms,可以参考一下。

1.2 RequestVote RPC

follower在转成candidate之后,要向其他节点发送RequestVote RPC,这个RPC是必须要发一次的,存在这种可能,就是在对某一个server发送RequestVote RPC之前candidate已经收到大部分follower的投票转成leader,此时你也许会判断一下,只有在candidate的阶段才发送RequestVote RPC,若不发送这个RPC,那么只有等到AppendEntries RPC发送的时候follower才会更新term,这个时候可能会导致2A的第一个测试点不通过:当一个candidate成为leader后,测试点一休眠了50ms,在这50ms内,如果一个follower没有收到AppendEntries RPC,那它不会更新term,这个时候会认为选举失败,因为follower和leader的term不一样。

不过这是小问题,只要在成为candidate的时候无论如何发送一次RequestVote RPC就可以了。

1.3 timer的实现

Raft要求我们实现timer,我觉得实现这个timer是有必要的,一定程度上简化了代码的逻辑,论文让重置计时器的时候就调用一下resetTicker就行了,我看过一篇其他的文章说不需要这个,不过我还是按照论文的要求实现了,这个timer的实现是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Raft struct {
    // ...
	round          int                 // current round of the election timer
    // ...
}

// should be called with rf.mu held
func (rf *Raft) resetTicker() {
	rf.round++
	go rf.ticker(rf.round)
}

// should be called with rf.mu held
func (rf *Raft) stopTicker() {
	rf.round++
}

func (rf *Raft) ticker(rnd int) {
    time.Sleep(electionTimeout)
    rf.mu.Lock()
    if rnd != rf.round {
        rf.mu.Unlock()
        return
    }
    // election timeout
    // ...
}

1.4 收到投票的处理

收到投票的时候,要判断follower投票的term是不是rf.currentTerm,这个小细节我在做的时候没有注意到,导致了bug。

2. Log replication

按照论文的要求实现就可以了,这个部分我没有遇到什么bug,主要debug阶段在2C

2.1 figure8 corner case

在commit的时候,我们只能够commit entryTerm == rf.currentTermLogEntry,具体的说明在论文上面有,我们只需要实现就可以了。

2.2 心跳包和非心跳包

在follower接受AppendEntries RPC的时候,对于一个包的前置判断是一样的,无论是心跳包还是非心跳包,我们都要判断term,重置计时器等等。在发送函数中,我们尽量不将发送心跳包和非心跳包作为两个不同的函数分开,因为他们收到回复的时候要进行的操作是一样的,至少在我的实现中是这样。

2.3 applyLoop

这个函数是用来应用log的,我们要在这个函数里面实现apply到状态机的操作,函数是在Start函数里面启动,在这里我们可以用到条件变量,当commitIndex更新的时候,我们唤醒这个函数,然后进行apply操作。

2.4 发送速度

在我的实现是如果nextIndex < lastLogIndex,那就每隔20ms发一次,如果要发送心跳包,那就每隔100ms发一次,可以参考一下。

3. Persist

这个部分要实现持久化,持久化是比较简单的,难的是他加强了log replication的测试,在这里需要将election timeout设置的长一点,实现实验指导里面加快达成一致的方法,不然会出现测试不通过的情况。

4. Log compaction

这个部分就比较复杂了,因为不是核心的内容,所以针对一些情景,论文中没有给出处理的方法,而且要考虑与figure2协作,这里列几点需要注意的细节:

4.1 snapshot是什么

snapshot 是客户端调用Snapshot时要保存的状态,给定一个lastIncludedIndexlastIncludedIndex及之前的log经过客户端处理后,客户端会将状态保存到snapshot中,然后调用Snapshot这个函数,我们要将lastIncludedIndex及之前的log删除,然后保存这个snapshot,以及对应的lastIncludedIndexlastIncludedTerm。这三个参数是需要持久化的。

4.2 snapshot的处理

对于snapshot,我们可以将它看成一个特殊的LogEntry,它不保存在Log数组里面,当nextIndex <= lastIncludedIndex,我们需要调用InstallSnapshot RPC,成功以后修改matchIndex,并进行与AppendEntries RPC一样的更新commitIndex的操作,在applyLoop里面,如果发现lastApplied+1 <= lastIncludedIndex,apply这个snapshot,并将lastApplied更新成lastIncludedIndex。这样子就完成了与figure2的统一。

4.3 关于log的偏移

我们可以将lastIncludedIndex看成一个偏移量,一开始偏移量是0,我们可以使用几个函数来避免手算偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (rf *Raft) getLastEntry() LogEntry {
	return rf.Log[len(rf.Log)-1]
}

func (rf *Raft) getLastEntryTerm() int {
	if len(rf.Log) == 0 {
		return rf.SnapshotTerm

	}
	return rf.Log[len(rf.Log)-1].Term
}
func (rf *Raft) getLastEntryIndex() int {
	if len(rf.Log) == 0 {
		return rf.SnapshotIndex
	}
	return rf.Log[len(rf.Log)-1].Index
}

func (rf *Raft) getEntry(index int) LogEntry {
	if index == 0 {
		return LogEntry{0, 0, nil, true}
	}

	return rf.Log[index-1-rf.SnapshotIndex]
}

func (rf *Raft) getEntryTerm(index int) int {
	if index-1-rf.SnapshotIndex == -1 {
		return rf.SnapshotTerm
	}

	return rf.Log[index-1-rf.SnapshotIndex].Term
}

func (rf *Raft) getEntryPos(index int) int {
	return index - 1 - rf.SnapshotIndex
}

4.4 InstallSnapshot RPC与AppendEntries RPC冲突的处理

会发生这一种情况:

  1. nextIndex > lastIncludedIndex && nextIndex <= lastLogIndex,此时我们应该发送AppendEntries RPC
  2. 在发送AppendEntries RPC的过程中,lastIncludedIndex因为客户端调用Snapshot而发生了变化,此时nextIndex <= lastIncludedIndex,我们应该发送InstallSnapshot RPC,注意,刚刚发送的AppendEntries RPC还没有收到回复,nextIndex还未更新。
  3. follower收到InstallSnapshot RPC,更新lastIncludedIndex,并且将lastIncludedIndex前的log删除,然后保存这个snapshot,以及对应的lastIncludedIndexlastIncludedTerm,然后回复InstallSnapshot RPC,leader收到回复,更新nextIndexmatchIndex
  4. 接着follower收到了AppendEntries RPC,这个时候prevLogIndex < lastIncludedIndex,应该特殊处理这一种情况,我们不做更新操作,返回false,follower的lastEntryIndex给leader,这个的值应该是lastIncludedIndex
  5. leader收到了AppendEntries RPC的回复,这时,可能会发现lastEntryIndex < matchIndex,意思是follower的最后一个LogEntry已经match了,这时不做更新nextIndex的操作。

总结

这个lab2的实验是比较复杂的,我在做的时候也遇到了很多问题,不过通过查阅资料,看别人的文章,最终还是做出来了,这个实验的难点在于细节,不过只要按照论文的要求实现,理清几个RPC的细节和逻辑,还是能够做出来的,剩下的就是调试了,这个实验的调试是比较困难的,因为要考虑到很多情况,要对着输出的信息慢慢看,需要很大的耐心,我们要相信自己能调试出来,一步步打log,看输出,修改代码,这样能达到很好的训练目的。

关于剩下的lab3和lab4我可能看情况再写了,不过论文,课程我还是会坚持看完的,写完这个实验,我对raft的理解更加深刻了,这个实验还是非常有意义的,它能让你掌握不使用debuger,只用log的调试技巧,在调试并发代码中,我感觉debuger几乎是没有使用场景的。学会在log中找出错误,这个技巧对以后的工作也会是非常有用的。

我现在还在测试lab2,已经aq 6800多次无fail

本文由作者按照 CC BY 4.0 进行授权