木鸟杂记

大规模数据系统

MIT 6.824 2020 Video Notes 5: Go Concurrency

6.824-schedule.png6.824-schedule.png

MIT finally released in-class video materials on YouTube this year. I had followed about half of this course before, and this year I plan to watch the videos and write some lecture notes. The course follows the thread of distributed systems fundamentals: fault tolerance, replication, and consistency, with carefully selected industrial-grade system papers as the main line, supplemented with detailed reading materials and sophisticated course labs, bridging academic theory and industrial practice. It is truly a rare gem of a distributed systems course. Course videos: Youtube, Bilibili. Course materials: 6.824 homepage. This is the fifth lecture note, including two parts: the first part is given by a TA about some Go primitives, design patterns, and practical techniques that will be used in lab 2, including memory models, goroutines and closures, time library, locks, condition variables, channels, signals, parallelism, and some common tools, etc. The second part is given by two other TAs sorting out some common bugs and debugging methods in Raft.

Author: Woodpecker Notes https://www.qtmuniao.com/2020/04/27/6-824-video-notes-5-go-concurrency/, please cite the source when reposting

Note: A goroutine is a lightweight thread. The terms goroutine and thread are used interchangeably below, both referring to threads in Go, although they are not completely equivalent.

Memory Model

Using threads serves two purposes:

  1. Improve performance by utilizing multiple cores.
  2. Construct code more elegantly.

In the labs for this course, we do not require using threads to maximize performance; we only require program correctness.

The same applies to the use of locks: we do not require fine-grained locking to boost performance; you may use coarse-grained locking to simplify code.

The memory model mainly discusses the ordering of code execution when multiple threads run. The main conclusion is that within a single thread, execution effects are guaranteed to be consistent with the sequential order of code statements. However, across threads, without explicit synchronization (via locks or channels), the order of statement execution cannot be guaranteed. That’s roughly what the memory model covers. The focus of this lecture is on exploring some typical code patterns that may be used in the labs.

Goroutine and Closure

Using for loops + goroutine can naturally express the process where a Leader sends messages to Followers in parallel. But note that when the loop variable (i in the example below) is referenced in a child goroutine, it’s best to make a copy beforehand (usually by passing it as a function parameter or copying it inside the loop body). In addition, WaitGroup is often used in the parent goroutine to blockingly wait for a group of child goroutines to finish. Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(x int) {
sendRPC(x)
wg.Done()
}(i)
}
wg.Wait()
}

func sendRPC(i int) {
println(i)
}

Time Library

Using Go’s standard library time, you can conveniently implement doing something periodically, such as the heartbeat logic in Raft.

1
2
3
4
5
6
func periodic() {
for {
println("heartbeat")
time.Sleep(1 * time.Second)
}
}

When Raft is killed, you may want to terminate all background threads. You can use a shared variable as the loop exit condition to achieve this.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var done bool
var mu sync.Mutex

func main() {
println("started")
go periodic()
time.Sleep(5 * time.Second) // wait for a while so we can observe what ticker does
mu.Lock()
done = true
mu.Unlock()
println("cancelled")
time.Sleep(3 * time.Second) // observe no output
}

func periodic() {
for done{
println("tick")
time.Sleep(1 * time.Second)
mu.Lock()
if done {
return
}
mu.Unlock()
}
}

It should be noted that when modifying and reading the shared variable done, both need to be wrapped with the lock sync.Mutex. This forces synchronization between multiple threads through sync.Mutex (one goroutine’s mu.Unlock will wake up another goroutine blocked on mu.Lock) to ensure that the main thread’s modification to done will definitely be seen by the child thread. Otherwise, without any synchronization measures, due to variable caching issues inside multiple threads, Go’s memory model does not strictly guarantee visibility of shared variables.

Mutex

There are mainly two situations where locks need to be used:

  1. Ensure visibility of shared variables among multiple threads.
  2. Ensure atomicity of a code block (it won’t be interleaved with statements in other goroutines).

Of course, these two situations are often one and the same.

Due to the fanciness of Go’s memory model, it is best to protect all accesses to shared variables with locks; otherwise the multithreaded code you write is likely to exhibit some hard-to-find and debug issues. Below is an example of using multiple threads to increment a shared variable without locking:

1
2
3
4
5
6
7
8
9
10
11
func main() {
counter := 0
for i := 0; i < 1000; i++ {
go func() {
counter = counter + 1
}()
}

time.Sleep(1 * time.Second)
println(counter) // unlikely to output 1000
}

The reason the final counter value is not 1000 is twofold:

  1. counter = counter + 1 is not atomic after being compiled into CPU instructions; parallel execution may produce interleaving.
  2. After counter is modified, it may not be seen by other threads in time, thus incrementing the old value.

Modified version:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
counter := 0
var mu sync.Mutex
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
defer mu.Unlock()
counter = counter + 1
}()
}

time.Sleep(1 * time.Second) // using WaitGroup here is more reliable; waiting 1s only guarantees correctness with high probability.
mu.Lock()
println(counter)
mu.Unlock()
}

Let’s look at an example where Alice and Bob borrow money from each other and try to keep the total amount unchanged. We use one thread each to represent Alice lending money to Bob and Bob lending money to Alice.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func main() {
alice, bob := 10000, 10000
var mu sync.Mutex

total := alice + bob

go func() {
for i := 0; i < 1000; i++ {
mu.Lock()
alice -= 1
mu.Unlock()
mu.Lock()
bob += 1
mu.Unlock()
}
}()
go func() {
for i := 0; i < 1000; i++ {
mu.Lock()
bob -= 1
mu.Unlock()
mu.Lock()
alice += 1
mu.Unlock()
}
}()

start := time.Now()
for time.Since(start) < 1*time.Second {
mu.Lock()
if alice+bob != total {
fmt.Printf("observed violation, alice = %v, bob = %v, sum = %v\n", alice, bob, alice+bob) // # violation
}
mu.Unlock()
}
}

Will the above code print a violation? The answer is yes. Because the observation thread may print a violation when someone has lent out money but the other person hasn’t received it yet. This problem can be understood from the following two angles:

  1. Atomicity. Lending and borrowing should be an atomic operation, so the entire process needs to be wrapped with a lock. Otherwise, if observed at some intermediate moment, inconsistency will occur: the money has been lent out but not yet received, i.e., it’s “in flight”.
  2. Invariant. Locks can guard invariants: after acquiring the lock and entering the critical section, you may break the invariant (lending money; at this point the money is “in the air”, and observation at this time will show one dollar missing), but restore it before releasing the lock (receiving the money, moving it from “in the air” into another person’s account, keeping the sum of both accounts unchanged).

Therefore, the transaction process needs to be changed to:

1
2
3
4
5
6
7
8
9
go func() {
for i := 0; i < 1000; i++ {
mu.Lock()
alice -= 1
bob += 1
mu.Unlock()
}
}()
// modify the other thread accordingly

Condition

In Raft, there is a scenario where a Candidate asks all Followers for votes, then decides whether to become a Leader based on the collected votes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
var mu sync.Mutex

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
}()
}

for { // busy wait
mu.Lock()

if count >= 5 || finished == 10 {
break
}
mu.Unlock()
}

mu.Lock()
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}

func requestVote() bool {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return rand.Int() % 2 == 0
}

This approach has busy waiting, and the continuous locking and unlocking during the loop causes extremely high CPU usage.

A very simple performance improvement is to add time.Sleep(50 * time.Millisecond) in the busy waiting loop, which can greatly reduce CPU usage.

Another more efficient way is to use condition. A condition is bound to a lock. After acquiring the lock, it temporarily suspends the thread via Wait and releases the lock; then another thread can acquire the lock via Lock, and before leaving the critical section, call Broadcast to wake up all threads suspended on Wait for this lock. It similarly uses a signal mechanism to notify among multiple threads about the release and acquisition of locks.

Modified code using Condition:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
var mu sync.Mutex
cond := sync.NewCond(&mu)

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
cond.Broadcast()
}()
}

mu.Lock()
for count < 5 && finished != 10 {
cond.Wait() // 1. release the mu 2. wait Broadcast() 3. try to acquire the mu again
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}

Note that the usage pattern of Condition requires that both cond.Broadcast and cond.Wait be inside the critical section guarded by the corresponding lock, and after calling cond.Broadcast the lock should be released promptly; otherwise it will cause contention from other threads for an unreleased lock.

1
2
3
4
5
6
7
8
9
10
11
12
13
mu.Lock()
// do something that might affect the condition
cond.Broadcast()
mu.Unlock()

//----

mu.Lock()
while condition == false {
cond.Wait()
}
// now condition is true, and we have the lock
mu.Unlock()

In addition, cond.Signal only wakes up one thread that called cond.Wait to enter waiting, while cond.Broadcast wakes up all threads waiting on the corresponding lock. Of course, the former is more efficient.

Channel

Unbuffered channels are usually used as a control mechanism for synchronization among multiple threads.

1
2
3
4
5
6
7
8
9
10
func main() {
c := make(chan bool)
go func() {
time.Sleep(1 * time.Second)
<-c
}()
start := time.Now()
c <- true // blocks until other goroutine receives
fmt.Printf("send took %v\n", time.Since(start))
}

Using channels within the same thread is meaningless; they are generally used as a communication mechanism among multiple threads (for synchronization or passing messages).

Buffered channels are similar to a synchronous queue, but they are basically not used in this Raft lab. It is recommended to use shared variables + locks for synchronization and mutual exclusion among multiple threads in the lab.

Using channels for synchronization can serve a similar purpose to WaitGroup:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
done := make(chan bool)
for i := 0; i < 5; i++ {
go func(x int) {
sendRPC(x)
done <- true
}(i)
}
for i := 0; i < 5; i++ {
<-done
}
}

func sendRPC(i int) {
println(i)
}

Raft Deadlock

A very important condition for deadlock, loosely speaking, is hold and wait. That is, everyone is holding onto what they have while also wanting what the other has, which naturally forms a deadlock.

A simple principle is to release locks promptly when performing time-consuming tasks (such as RPC, IO).

RPC Reply Staleness

When a Candidate collects vote results, it needs to check whether it is still in the original term and whether it is still a Candidate. Otherwise two leaders may be elected.

1
2
3
if rf.state != Candidate || rf.currentTerm != term {
return
}

However, in fact checking only currentTerm is sufficient, but checking only state is not enough.

Debugging

DPrintf: server number is a very important field; it is recommended to output it at the beginning; then add output statements before and after places of interest.

Deadlock: you can send a SIGQUIT signal to a running Go program via control+\, then the program will exit and print the current stack traces of all threads.

Race conditions: go test -race -run 2A, use -race to have Go print the stack trace when it detects a race. Generally this occurs when shared variables are accessed by multiple threads, and at least one of the access points is not locked.

Glossary

Critical section (strict area): a code segment that multiple threads need to access mutually exclusively.

Busy wait: continuously looping until the condition is satisfied.

References

  1. Related English notes.
  2. Related code mentioned.

我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg