分布式系统,程序语言,算法设计

6.824 2020 视频笔记二:RPC和线程

6.824-schedule.png

MIT 今年终于主动在 Youtube 上放出了随堂视频资料,之前跟过一半这门课,今年打算刷一下视频,写写随堂笔记。该课程以分布式基础理论:容错、备份、一致性为脉络,以精选的工业级系统论文为主线,再填充上翔实的阅读材料和精到的课程实验,贯通学术理论和工业实践,实在是一门不可多得的分布式系统佳课。课程视频: YoutubeB站。课程资料:6.824主页。本篇是第二节课笔记,RPC 和线程。

为什么用 Go

  1. 语法先进。在语言层面支持线程(goroutine)和管道(channel)。对线程间的加锁、同步支持良好。
  2. 类型安全(type safe)。内存访问安全(memory safe),很难写出像 C++ 一样内存越界访问等 bug。
  3. 支持垃圾回收(GC)。不需要用户手动管理内存,这一点在多线程编程中尤为重要,因为在多线程中你很容易引用某块内存,然后忘记了在哪引用过。
  4. 简洁直观。没 C++ 那么多复杂的语言特性,并且在报错上很友好。

作者:青藤木鸟 https://www.qtmuniao.com/2020/02/29/6-824-video-notes-2/, 转载请注明出处

线程(Threads)

线程为什么这么重要?因为他是我们控制并发的主要手段,而并发是构成分布式系统的基础。在 Go 中,你可以将 goroutine 认为是线程,以下这两者混用。 每个线程可以有自己的内存栈、寄存器,但是他们可以共享一个地址空间。

使用原因

IO concurrency(IO并发):一个历史说法,以前单核时,IO 是主要瓶颈,为了充分利用 CPU,一个线程在进行IO 时,可以让出 CPU,让另一个线程进行计算、读取或发送网络消息等。在这里可以理解为:你可以通过多个线程并行的发送多个网络请求(比如 RPC、HTTP 等),然后分别等待其回复。

Parallelism(并行):充分利用多核 CPU。

关于并发(concurrency)和并行(parallelism)的区别和联系,可以看这篇文章。记住两个关键词即可:逻辑并发设计 vs 物理并行执行。

Convenience(方便):比如可以在后台启动一个线程,定时执行某件事、周期性的检测什么东西(比如心跳)。

Q&A:

  1. 不使用线程还能如何处理并发?基于事件驱动的异步编程。但是多线程模型更容易理解一些,毕竟每个线程内执行顺序和你的代码顺序是大体一致的。
  2. 进程和线程的区别?进程是操作系统提供的一种包含有独立地址空间的一种抽象,一个 Go 程序启动时作为一个进程,可以启动很多线程(不过我记得 Goroutine 是用户态的执行流)。

使用难点(challenges)

共享内存易出错。一个经典的问题是,多个线程并行执行语句:n = n + 1 时,由于该操作不是原子操作,在不加锁时,很容易出现 n 为非期望值。

我们称这种情况为竞态 (race):即两个以上的线程同时试图改变某个共享变量。

解决的方法是加锁,但如何科学的加锁以兼顾性能避免死锁又是一门学问。

Q&A:

  1. Go 是否知道锁和资源(一些共享的变量)间的映射?Go 并不知道,它仅仅就是等待锁、获取锁、释放锁。需要程序员在脑中、逻辑上来自己维护。
  2. Go 会锁上一个 Object 的所有变量还是部分?和上个问题一样,Go 不知道任何锁与变量之间的关系。Lock 本身的源语很简单,goroutine0 调用 mu.Lock 时,没有其他 goroutine 持有锁,则 goroutine0 获取锁;如果其他goroutine 持有锁,则一直等待直到其释放锁;当然,在某些语言,如 Java 里,会将对象或者实例等与锁绑定,以指明锁的作用域。
  3. Lock 应该是某个对象的私有变量?如果可以的话,最好这样做。但如果由跨对象的加锁需求,就需要拿出来了,但要注意避免死锁。

线程协调(Coordination)

  1. channels:go 中比较推荐的方式,分阻塞和带缓冲。
  2. sync.Cond:信号机制。
  3. waitGroup:阻塞知道一组 goroutine 执行完毕,后面还会提到。

死锁(DeadLock)

产生条件:多个锁,循环依赖,占有并等待。

如果你的程序不干活了,但是又没死,那你就需要看看是否死锁了。

爬虫(Web crawler)

  1. 从一个种子网页 URL 开始

  2. 通过 HTTP 请求,获取其内容文本

  3. 解析其内容包含的所有 URL,针对所有 URL 重复过程 2,3

    为了避免重复抓取,需要记下所有抓取过的 URL。

由于:

  1. 网页数量巨大
  2. 网络请求较慢
    一个接一个的抓取用时太长,因此需要并行抓取。这里面有个难点,就是如何判断已经抓取完所有网页,并需要结束抓取。

抓取代码

代码在阅读材料中有。

串行爬取。深度优先遍历(DFS )全部网页构成的图结构,利用一个名为 fetched 的 set 来保存所有已经抓取过的 URL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}

并行爬取

  1. 将抓取部分使用 go 关键字变为并行。但如果仅这么改造,不利用某些手段(sync.WaitGroup)等待子 goroutine,而直接返回,那么可能只会抓取到种子 URL,同时造成子 goroutine 的泄露。
  2. 如果访问已经抓取的 URL 集合 fetched 不加锁,很可能造成多次拉取同一个网页。

WaitGroup

1
2
3
4
5
6
7
8
9
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u) // u 被拷贝
}
done.Wait()

WaitGroup 内部维护了一个计数器:调用 wg.Add(n) 时候会增加 n;调用 wait.Done() 时候会减少1。这时候调用 wg.Wait() 会一直阻塞直到当计数器变为 0 。所以 WaitGroup 很适合等待一组 goroutine 都结束的场景。

Q&A

  1. 如果 goroutine 异常退出没有调用 wg.Done() 怎么办?可以使用 defer 将其写在 goroutine 开始:defer wg.Done()

  2. 两个 goroutine 同时调用 wg.Done() 会有竞争(race),以至于内部计数器不能正确减少两次吗?WaitGroup 应该有相应机制(锁什么的)来保证 Done() 的原子性。

  3. 定义匿名函数时,匿名函数中变量和外层函数同名变量间的关系?这是个闭包(closure)问题。如果匿名函数中变量没有被参数覆盖(如上述代码中 fetcher),就会和外层同名变量引用同一个地址。如果通过传参传递(如上述代码中 u),哪怕参数和外层变量看起来一样,但匿名函数使用的也是传进来的参数,而非外层变量;尤其针对 for 循环变量,我们通常通过参数来将其在调用时拷贝一次,否则 for 循环启动的所有 goroutine 都会指向这个不断被 for 循环赋值改变的变量。

    对于闭包,go 中有个”变量逃逸“(Variable Escape)的说法,如果某个变量在函数声明周期结束时仍被引用,则将其分被到堆而非函数栈上。对闭包来说,某个变量同时被内层和外层函数引用,则其会被分配到堆上

  4. 既然字符串 u 是不可变(immutable)的,为什么所有 goroutine 还会引用到不断变化的值?string 的确是不可变的,但是 u 的值一直在变,而 goroutine 和外层 goroutine 共享 u 的引用。

去掉锁

如果在更新 map 的时候去掉锁,运行几次发现并没有什么异常,因为 race 其实很难检测。好在 go 提供了竞态分析工具帮你来找到潜在含有竞态的地方:go run -race crawler.go

注意该工具没有做静态分析,而是在动态执行过程中观察、记录各个 goroutine 的执行轨迹,进行分析。

线程数量

Q&A

1.该代码在整个运行中会同时多少线程在运行(goroutine)?
该代码并没有做明显的限制,但是其明显和 URL 数量、抓取时间正相关。例子中输入只有五个 URL,因此没有什么问题。但在现实中,这么做可能会同时启动上百万个 goroutine。因此一个改进是,实现启动一个固定数量的 worker 池子,每个 worker 干完后就去要/被分配下一个任务。

使用 channel 通信

我们可以实现一个新的爬虫版本,不用锁+共享变量,而用 go 中内置的语法:channel 来通信。具体做法类似实现一个生产者消费者模型,使用 channel 做消息队列。

  1. 初始将种子 url 塞进 channel。
  2. 消费者:master 不断从 channel 中取出 urls,判断是否抓取过,然后启动新的 worker goroutine 去抓取。
  3. 生产者:worker goroutine 抓取到给定的任务 url,并将解析出的结果 urls 塞回 channel。
  4. master 使用一个变量 n 来追踪发出的任务数;往发出一份任务增加一;从 channel 中获取并处理完一份结果(即将其再安排给worker)减掉一;当所有任务都处理完时,退出程序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func master(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
master(ch, fetcher)
}

Q&A:

  1. master 读 channel,多 worker 写 channel,不会有竞争问题吗?channel 是线程安全的。
  2. channel 不需要最后 close 吗?我们用 n 追踪了所有执行中的任务数,因此当 n 为0退出时,channel 中不存在任何任务/结果,因此 master/worker 都不会对 channel 存在引用,稍后 gc collector 会将其回收。
  3. 为什么在 ConcurrentChannel 需要用 goroutine 往 channel 中写一个 url?否则 master 在读取的时候会一直阻塞。并且 channel 是一个非缓冲 channel,如果不用 goroutine,将会永远阻塞在写的时候。

欢迎关注公众号分布式点滴,获取更多分布式系统文章。

wx-distributed-system-new-s.jpg