MapReduce —— 历久而弥新

MR执行流

引子

MapReduce 是谷歌 2004 年(Google 内部是从03年写出第一个版本)发表的论文里提出的一个概念。虽然已经过去15 年了,但现在回顾这个大数据时代始祖级别概念的背景、原理和实现,仍能获得对分布式系统的很多直觉性的启发,所谓温故而知新。

在Google 的语境里,MapReduce 既是一种编程模型,也是支持该模型的一种分布式系统实现。它的提出,让没有分布式系统背景的开发者,也能较轻松的利用大规模集群以高吞吐量的方式来处理海量数据。其解决问题思路很值得借鉴:找到需求的痛点(如海量索引如何维护,更新和排名),对处理关键流程进行高阶抽象(分片Map,按需Reduce),以进行高效的系统实现(所谓量体裁衣)。这其中,如何找到一个合适的计算抽象,是最难的部分,既要对需求有直觉般的了解,又要具有极高的计算机科学素养。当然,并且可能更为接近现实的是,该抽象是在根据需求不断试错后进化出的海水之上的冰山一角。

作者:木鸟杂记 https://www.qtmuniao.com, 转载请注明出处

需求

谷歌当时作为互联网的最大入口,维护着世界全网索引,最早触到了数据量的天花板。即,哪怕针对很简单的业务逻辑:如从爬虫数据中生成倒排索引、将图状网页集合用不同方式组织、计算每个主机爬取的网页数量、给定日期的高频查询词汇等等,在全球互联网数据的尺度的加成下,也变的异常复杂。

这些复杂性包括:输入数据分散在非常多的主机上、计算耗资源太多单机难以完成、输出数据需要跨主机进行重新组织。为此,不得不针对每个需求重复构造专用系统,并耗费大量代码在分发数据和代码、调度和并行任务、应对机器故障和处理通信失败等问题上。

抽象

map 和 reduce 的抽象灵感来自于函数式编程语言 Lisp,为什么选定这两个概念呢?这来源于谷歌人对其业务的高度提炼:首先输入可以切分成一个个逻辑的记录 (record);然后对其每个 record 执行某种映射 (map) 操作,生成一些键值对组成的中间结果(为什么要分键和值呢?为最后一步做铺垫,允许用户将中间结果以任意指定的方式——,来进行组织规约);最后在具有相同键的中间结果子集上执行规约reduce ,包括排序,统计,提取最值等等)操作。

函数式模型的另一个特点在于对 map 操作实现的约束,即规定用户应提供一个无副作用的 map 操作(相关概念有纯函数确定性幂等性等等,当然他们的概念并不一样,后面小结会详细讨论)。如此限制,好处有二,可以进行大规模并行执行,可以通过换地儿重试来屏蔽主机故障。

具体到落地上,mapreduce 都是用户自定义函数。map 函数接受一个 Record,不过为了灵活,一般也组织为键值对;然后产生 List[key, value],reduce 函数接受一个 key 和该 key 对应的所有中间结果 List[value]。即:

1
2
map (k1,v1)             -→ list(k2,v2)
reduce (k2,list(v2)) -→ list(v2)

拿由谷歌这篇论文提出,后来成为大数据处理界的 hello world 级别示例程序 Word Count (对一堆文档中的单词计数)来说,mapreduce 的实现长这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

这里有两个有意思的点:

  1. 中间变量需要网络传输,必然会涉及到序列化。MapReduce 的最初版本选择是一切解释权归运行时的用户代码所有,我只是傻傻的传 string。即,规定用户在 map 中将任何输出的中间结果对象都转换为 string,然后在 reduce 函数中接收该 Iterator[string] 后,自行解析为自己想要的格式。当然,在后来的模仿框架如 Hadoop 中,序列化和解序列化部分被拿了出来,可以由用户来自定义实现来满足功能或性能上的需求。没去查证,谷歌后来相比对此也做了优化。这是一种很自然的系统演化思路,初期设计尽量简单粗暴以求快速实现可用原型;在积累使用经验后再对某些不便模块进行扩展设计。
  2. reduce 接受 value 集合被组织为了迭代器(Iterator)。相信用过 Python 的同学应该对迭代器不陌生,它是一个很简单的接口,包括 nextstop 两个语义。配合 for loop ,构成一个很强大的抽象。不管你底层是一个内存中的 List、还是文件内容、还是网络 IO 流,只要能在运行时知道如何得到下一条记录,什么时候时候停止,都能被 for 循环来利用,进行逐一处理。迭代器抽象的一个好处在于,不必将待迭代的内容一次加载到内存,可以对数据增量式的惰性加载。MapReduce 框架的此处实现也正是利用了该特性。

实现概览

抽象定了,那么实现自然可以有不同,这也是接口和实现分离的意义所在。前者的抽象是一种思想,谷歌已经给你做了;后者的实现,完全可以根据自己的生产环境进行量体裁衣的来定制实现。谷歌在 paper 中给了一种内部经典版,Hadoop 也提供了一套通用版,当然我们也可以根据自己的业务需求和场景约束来实现一个合身版。

谷歌发布论文时 实现 MapReduce 所面对的系统环境长这样:

  1. 单机双核 x86 架构,2~4G内存,Linux 系统
  2. 通用网络硬件,百兆或者千兆以太网
  3. 集群含数百或者数千台机器,因此机器故障是常态
  4. 用的廉价 IDE 接口硬盘,但是人家有 GFS 来提供容错
  5. 多租户支持,因此需要做 Job 级别抽象和资源约束

流程概览

输入由用户指定切分大小后,切分成 M 份,然后分散到不同机器上(由于 GFS 的存在,也可能该输入 Block 本来就在那台机器上)。每个机器上会并行的跑用户定义的 mapmap 输出的中间结果,亦由用户指定,按 key 值范围切分为 R 份,对于每个中间结果,通过 node label = hash(key) mod R 决定其去处。下面是流程概览图:

MR执行流

  1. 首先把输入切分成 M 份,通常每份 16 ~ 64M,这个粒度用户按需进行把握;然后把这些分片分散到不同机器上(如果有GFS这种分布式文件系统配合的话,可能本来就分散着),然后在每个机器上 fork 一份用户的代码。对于用户代码分发,是一个有意思的话题,对于不同语言可能有不同实现,比如谷歌的C++,可能传输的是动态链接库;比如 Hadoop 的 Java 传的可能是 jar 包 (当然, 所有依赖也得传);如果是 PySpark 的 Python 呢,可能用的就是神奇的 cloudpickle;总之,不同语言需要考虑的传输机制是不一样的,比如说动态语言和静态语言;此外,全局变量和外部依赖也是需要考虑的点,谷歌虽然在此一笔带过,但是不同语言需要面对的情况可能差别很大。
  2. Master 的这份程序拷贝是不一样的,是负责安排活的。会选空闲的 worker 来安排这 M 个 map 任务和 R 个 reduce 任务。这需要考虑的是,worker 执行每个用户代码是单独启动一个进程,还是插入到系统 loop 中去执行。
  3. 执行 map 任务的 Worker,会读取被分配到的输入切片,解析出键值对流,送给用户定义的 map 函数。map 后产生的临时结果首先会缓存在内存中。虽然论文中没有展开,但可以预见的是,如何切片,如何解析出键值对流,不同用户对于同样的输入可能有不同的关注点,因此必然存在定制化解析的需求。这一部分(FileSplit 和 RecordReader)在稍后随着承载业务的增多,估计也会开放出来给用户自定义,事实上 Hadoop 就是这么干的。
  4. 缓存的中间结果会被定期在执行 Map Task 的机器本地进行刷盘(这也算一个本地性的优化,但是也有后果,就是一旦该机器故障,容错会稍微麻烦点,后面会说),并且按用户指定的 Partition 函数拆分成 R 个块,然后将这些位置信息发给 Master 节点。Master 负责通知相应的 Reduce Worker 以拉取对应数据。
  5. Reduce Worker 收到这些中间结果的位置信息后,会通过 RPC 来拉取其对应的 Partition 的数据。对于某个 Reduce Worker 来说,待所有数据拉取完成后,会将其按照 key 来进行排序。这样一来,所有具有同样 key 的数据便挨到了一块。第 4 步和第 5 步的过程合起来就是 shuffle,涉及到外部排序、多机数据传输等极其耗时操作;当数据量比较小时,如何实现都不成问题。但是当数据量大到一定程度,这里很容易成为性能瓶颈,因此也有一些优化方法,稍后会针对 shuffle 做详细展开,此处按下不表。
  6. 待中间数据排好序之后,Reduce Woker 会对其进行扫描,然后将一个个 key 和其对应的值集合,即 <k2, list(v2)> 传给用户定制的 reduce 函数,然后将生成的结果追加到最终输出文件。对于谷歌来说,一般来说就是支持并行 append 的文件系统 GFS,好处在于可以多进程同时写结果。
  7. 当所有 reduce 任务完成后,master 会唤醒用户进程,即在用户代码的视角,MapReduce 任务是阻塞的。

一般而言,用户无需将最终结果的 R 个 Partition 进行合并,而是将其直接作为下一个 MapReduce 任务的输入。Spark RDD 的partition 就是将这一特点概念化了,并且将每一步 MapReduce 输出也放内存中,不进行落盘,以降低连续 MapReduce 任务的延迟。

局部性

计算机科学中常用的一个原理,叫做局部性原理 (locality reference,这里特指空间局部性),说的是程序在顺序执行时,访问了一块数据,接下来大概率会访问该数据(物理位置上)旁边的一块数据。很朴素的断言,却是一切 cache 发挥作用的基础,计算机存储因此也形成了由慢到快,由贱到贵,由大到小的存储层次体系(硬盘-> 内存 -> 缓存 -> 寄存器)。在分布式环境中,这个层次体系至少还要再罩上一层——网络IO。

在 MapReduce 系统中,我们也会充分利用输入数据的 locality。只不过这次,不是将数据加载过来,而是将程序调度过去Moving Computation is Cheaper than Moving Data)。如果输入存在 GFS 上,表现形式将为一系列的逻辑 Block,每个 Block 可能会有几个(一般是三个)物理副本。对于输入每个逻辑 Block,我们可以在其某个物理副本所在机器上运行 Map Task(如果失败,就再换一个副本),由此来尽量减小网络数据传输。从而降低了延迟,节约了带宽。

Master 的数据结构

谷歌的 MapReduce 实现是有作业(Job)级别的封装的,每个 Job 包含一系列任务(Task),即 Map Task 和 Reduce Task。那么,我们要维护一个正在运行的 Job 的元信息,就势必要保存所有正在执行的 Task 的状态,其所在的机器 ID 等等。而且,Master 事实上充当了 Map Task 输出到 Reduce Task 输入的一个”管道“。每一个 Map Task 结束时,会将其输出的中间结果的位置信息通知 Master,Master 再将其转给对应的 Reduce Task,Reduce Task 再去对应位置拉取对应 size 的数据。注意,由于 Map Task 的结束时间不统一,这个通知->转发-> 拉取 的过程是增量的。那么不难推测出,reduce 侧对中间数据排序的应该是一个不断 merge 的过程,不大可能是等所有数据就位了再全局排序。

在分布式系统中,一个比较忌讳的问题就是单点。因为是牵一发而动全身,而 Master 就是这样一个单点。当然单个机器的统计平均故障率并不高,但是一旦故障,那么整个集群都将不可用。但同时,有一个 Leader 节点会大大简化分布式系统的的设计;因此采用单点 Master 的系统反而是主流,那势必需要开发一些其他手段来强化 master 的容错能力,比如说记 log + snapshot、比如说主从备份、比如说每次从 worker 心跳进行状态重建、比如说用其他实现了分布式一致性协议的系统来保存元信息等等。

容错

集群中有 Master 和 Worker 两种机器角色。

Worker 由于数量大,有机器故障概率较大。在分布式系统中,Master 获取 Workers 的信息,最常见便是心跳,既可以是 master ping worker,也可以反过来,也可以兼而有之。master 通过心跳发现某些 worker 不可到达后(可能是 worker 死掉了,也可能是网络出问题了等),就会将该 Worker 打个故障(failed)的标记。

之前已经调度到该故障 Worker 上的任务(Task) 很显然有两种类型: Map Task 和 Reduce Task。对于 Map Task以下所提的 Task,肯定是从属于未结束的 Job) ,不管成功与否,我们都要进行重试,因为一旦该 Worker 变为不可达,存于其上的中间结果也随之无法被 Reduce Task 获取。当然,我们可以在 Master 中多记点状态来减少对已完成的 Map Task 进行重试的概率。比如记下某个 Map Task 的输出是否已经都被 Reduce Task 拉取,以决定要不要对正常完成的 Map Task 进行重试,但无疑会提高系统复杂度。工程往往会对环境做某些假设, 以简化某些实现;我们假设 worker 失败率不是那么高,或者重试所有 Map Task 的代价可以忍,那么就可以简化一点实现,以保持系统的简约,减少可能的 bug。对于 Reduce Task,未完成的无疑要进行重试,已经完成的,由于其输出结果我们假设会写到全局分布式系统文件中(即某些机器挂了也不影响),就不会重试。

具体重试的方法,可以标记需要重试的 Task 的状态为 idle,以告诉调度器,该 Task 可以重新被调度。当然,也可以实现为从一个(工作/完成)队列倒腾到另一个(就绪)队列,本质上是一样的,都是合理实现一个 Task 的状态机

至于 master 的故障恢复,上一节稍有提到。如果在实践中 Master 确实很少死掉,并且偶尔死掉造成所有正在运行的任务失败的后果也可以接受,那么就可以粗暴的实现为如果 Master 死掉,就简单通知所有正在运行的任务的用户代码任务失败了(比如返回非 0 值),然后有用户代码自行决定丢弃任务还是待集群重启后进行重试:

1
2
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();

如果业务对于宕机时间有要求,或者大面积任务失败不可以忍受,那么就需要增强 Master 的容错性。常用的方法上节有提到,这里展开一下:

  1. snapshot + log:将 Master 的内存数据结构定期做快照(snapshot)同步到一个持久存储,可以写到外部存储,可以同步到其他节点,可以一份也可以多份。但是只做快照的话,时间间隔不好选择:间隔太长容易造成恢复时状态丢失过多;间隔过短会加重系统负载。因此常用的辅助手段是记 log,即对每个会改变系统的状态的操作进行记录。这样就可以选择长一点的快照间隔,恢复时,先加载快照,再叠加上快照点之后的日志。
  2. 主从备份。比如 Hadoop 原先的 secondary namenode,用属于不同容错阈两台机器都作为 Master,只不过一个用来响应请求,另一个用来实时同步状态。等某台机器故障发生时,立即将流量切换到另一个机器上。至于其同步机制,则是另一个可以考量的点。
  3. 状态外存。如果元数据量不大,可以用 Zookeeper 或者 etcd 这种实现了分布式一致性协议的集群来保存。由于这些集群本身具有容错能力,因此可以认为避免了单点故障。
  4. 心跳恢复:重新启动一个 Master 后,利用所有 Worker 报上来的信息进行 Master 数据结构的重建。

还值得一提的是,容错也需要用户侧代码做配合。因为框架会对不成功的 map/reduce 用户代码进行重试。这就要求,用户提供的 map/reduce 逻辑符合确定性Deterministic):即函数的输出依赖且仅依赖于输入,而不取决任何其他隐形的输入或者状态。当然,这个蕴含了幂等性Idempotency):多次执行和一次执行效果一样;但是幂等性并不能推出确定性;假设有这么一个函数,它第一次执行造成了一些状态改变(比如某些释放资源的 dispose 函数),而后续发现状态已经改变过了就不再改变该状态,那么它符合幂等性;但是由于其含有隐式状态输入,不是确定性的。

如果 map/reudce 函数是确定性的,那么框架就可以放心大胆重试了。某些条件下,幂等性也可以接受,比如保存隐式状态的地方很牢靠。举个栗子,我们依赖于一个文件锁做判断某个函数执行了一次或多次,如果该文件锁所依赖的文件系统很稳定,并且提供分布式一致性,那么就完全可以。如果是用 nfs 的一个文件做锁,来实现的所谓幂等性就值得商榷了。

如果 map/reduce 函数是确定性的,框架会通过其输出提交的原子性来进行幂等性保证。即,即使重试了多次,也和只执行了一次一样。具体来说,对于 Map Task,会产生 R 个临时文件,并在结束时将其位置发送给 Master;Master 在收到多次同一分片(split) 的位置信息时,如果该分片前面某次结果来源仍可用或者已经被消费,那么就忽略掉该请求后面的所有请求。对于 Reduce Task,其生成的结果也会先写成临时文件,然后依赖于底层文件系统的原子性的改名操作(原子性改名也是一个多进程竞争的经典的操作,因为生成文件过程比较长,不容易做成原子的,但是判断具有某名字的文件是否存在并改名却很容易做成原子的),在处理完成时改变为目的文件名。如果发现已经有一个具有该目的文件名的文件了,就放弃改名操作,从而保证了该 Reduce Task只有一个成功输出的最终文件。

任务粒度

一个 MapReduce Job 中会产生 M+R 个 Task,具体 M 和 R 的值在运行之前可以由人进行配置。不同的系统实现可能会有发挥出最佳系统性能的不同配比;但是同时要兼顾业务需求,比如输入大小,输出文件个数等等。

备份任务

在实际业务中,由于某些主机原因常会出现长尾效应,即少数几个 Map/Reduce Task 总是会巨慢的拖到最后,甚至拖得时间甚至是其他任务的几倍。造成这些主机拖后腿的原因可以举出很多,如:某个机器硬盘老化,读写速度几十倍的变慢;又比如调度器调度的有问题,导致某些机器负载太高,具有大量的 CPU、内存、硬盘和网络带宽的争抢;又比如软件 bug 导致某些主机变慢等等。

只要确定这些问题只发生在少数某些主机上,那么解决方法也很简单。在任务接近尾声的时候(比如统计剩余task的占比小于一个阈值时),对于每个仍然在跑的任务,分别额外调度一份到其他主机上,那么大概率会让这些任务提前完成,同一任务跑多次的处理逻辑,和容错重试造成跑多次是一致的,可以复用。

此外,我们可以通过实际业务检验来微调该阈值(包括怎么判定任务结尾,启用几个备份任务),在耗费额外资源代价和减少任务总用时之前取得一个平衡。

其他细节

除了 Mapper 和 Reducer 这两个最基本的源语,该系统还提供了一些其他的后来事实上也成为标配的扩展:Partitioner,Combiner 和 Reader/Writer

Partitioner

默认来说,对 Map 输出的中间结果进行划分会使用类似于 hash(key) mod R 这种应用无关的划分算法。但是有时候用户有需求将特定的一些 keys 路由到同一个 Reduce Task,比如说中间结果的 key 是 URL, 用户想按网站 host 进行汇总处理。这时候就需要将系统的这部分路由功能开放给用户,以满足用户的定制需求。

Combiner

如果该 Job 针对所有中间结果的 reduce 的操作满足结合律,那么指定 Combiner 会很能提高效率。拿的 Word Count 来说,数值的加法无疑满足结合律,也就是说,同一个单词的频次,在 Map Task 输出后进行加和(在 Map Work 上),还是在 Reduce Task 中进行加和(在 Reduce Worker上),结果保持一致;而这样一来,由于一些中间结果对进行了 combine,Map Task 到 Reduce Task 间的传输数据量会小很多,从而提高整个 Job 的效率。

也可以看出,combine 函数一般和 reduce 函数是一样的,因为他们本质上是对 value set 执行了同一种操作,只不过执行时,执行的地点不一样,结合的顺序不一样。目的是为了减少中间结果传输量,加速任务执行过程。

Reader/Writer

如果不将定制输入输出的能力开放给用户,那么系统显然只能处理有限几种默认约定的格式。因此,readerwriter 接口本质上是系统和现实繁杂的业务之间的适配器(Adaptor)。它们让用户可以自行指定数据的来源和去处按需要理解输入内容自由定制输出格式

有了这两个 Adaptor,系统才能适配更多的业务。一般来说,系统会内置提供一些常见的 Reader 和 Writer 的实现;包括按行读文本文件,读文件中键值,读数据库等等。然后用户可以实现这两个接口,进行更具体的定制。系统常通过类似这种常用脚手架+进一步定制能力来提供API,下面的 Counter 也是如此。

副作用

有些用户实现的 map/reduce 函数会有一些副作用,比如说在执行任务中间输出一些文件、写一些数据库条目等等。一般来说这些副作用的原子性和幂等性需要用户自己来处理。因为如果输出介质不纳入 MapReduce 系统,系统是没有办法保证这些输出的幂等性和原子性的。不过有的系统就这么干的,提供一些某种类型/介质的状态或者数据存储,纳入系统中,并且提供一些容错和幂等的性质。好像 MillWheel 有类似的做法。但这样会大大加重系统的复杂性。

跳过坏记录

如果用户代码有 bug 或者某些输入有问题,会导致 Map 或者 Reduce 任务在运行时崩溃。当然这些 bug 或者输入能修则修,但是有些情况由于第三方库或者输入的原因,不能够进行修复。而在某些类型的任务,比如说训练数据集清洗、大型统计任务,丢几个是可以容忍的。针对这种情况,系统会提供一种模式,在这种模式中会跳过这些 Record 记录的执行。

具体实现上来说,也比较简单。可以给每个输入 Record 给个唯一编号(单次任务内唯一就行);如果某个 Record 处理时挂掉了,就将其编号汇报给 Master。如果 Master 收到了某个 Record 超过一次的处理失败信息,就将其跳过。做的再细一点,还可以记下错误类型和信息进行比对,来确定这是否是一个确定性(deterministic)的错误,进而决定是否将其跳过。

单机执行

众所周知,分布式系统很难跟踪、调试;因为一个 Job 可能同时分散在数千台机器上进行执行。因此系统提供了本地运行 Job 的能力。可以针对小数据集输入对代码的正确性进行测试。由于在单机运行,就可以比较方便通过调试工具进行断点追踪

实现一个本地 mock 系统,一般来说比较简单。因为不需要考虑网络间状态通信,代码多节点分发,多机调度等一系列分布式系统的问题。但却能极大方便代码调试,性能测试和小数据集测试。

状态信息

对于分布式执行的 Job,一个任务进度等信息可视化界面(给系统集成一个 HTTP 服务,实时拉取系统信息进行展示)有时候是至关重要的,它是系统易用性的关键。如果系统用户不能够很方便的实时监控任务的运行进度、执行速度、资源用量、输出位置,出错信息以及其他一些任务的元信息,就不能对任务的执行状况有个感性的把握。尤其是如果写 MapReduce 程序的人和跑这些程序的不是一个人时,会更为依赖这些状态的实时呈现。

因此,对于分布式系统来说,其易用性有一大半落在一个良好的系统信息呈现上。使用者需要据此来预测任务的完成时间、资源的吃紧程度等等,从而做出相应决策。

此外,对与集群机器状态信息,也需要进行跟踪,因为机器的负载信息、故障信息、网络状况等等对用户任务的执行也有不同程度的影响。给出这些机器状态信息,有助于对用户代码甚至系统代码进行出错诊断。

全局计数器

系统提供了一种计数服务,以统计某种事件的发生频次。比如用户想统计 Word Count 示例中所处理的全大写单词的总数:

1
2
3
4
5
6
7
8
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");

从代码可以大致猜测其实现:定义的时候,需要给 Counter 指定一个 Id。然后在 Map/Reudce 代码中可以通过该 Id 获取该 Counter 然后进行计数。每个 worker 机器上的计数信息会汇总到 Master 上,然后按 Counter 的 ID 进行加和,并且最终返回给用户。同时,前述展示状态信息页面也会将这些计数器进行可视化(比如说打折线图)。其中有个点需要注意,就是多次对重试的任务(由于机器死掉或者避免长尾进行的重试)的计次进行去重;可以按照 Map/Reduce ID 来进行去重,即我们假定同一输入的重试任务共享一个 Task ID(事实上为了满足重试需求和任务管理需求,分布式系统肯定会对所有任务进行唯一编号的),针对具有相同 Task ID 内部的 Counter 的计次,Master 只保留第一次成功的那一份;但是如果计数需要在页面上实时显示,可能就需要做适当信息保留,并且在该 Task 重试时进行计数回退之类的操作。

系统会自动维持一些计数器,比如说所有已经处理的键值对的数量和所有已经产生的键值对数量。全局计数操作对于某些应用用处很大,比如说有的应用要求所有输入键值对和输出键值对的数量一样,如果没有全局计数,就无从验证;或者统计一些数据的全局比例等等。

重排(shuffle) 操作

自 Spark 成名之后,shuffle 这个 MapReduce 中的语义得到了很多研究和实践。这是一个多机传输的耗时操作,其实现的高效性对系统的性能有着至关重要的作用,因此单独拿出一节来聊聊。

在 MapReduce 中就是指 Map Task 分片输出到 Reduce Task 按需拉取的这么一个过程。还拿 Word Count 为例,你想统计某个单词在所有文档中的总频次,但是这些单词分布在不同机器上的不同的 Map Task 输出里;而只有将所有同样单词的频次对聚集到同一台机器上,才能对其加和。这种将机器和子数据集对应关系按key打乱重组的操作,我们姑且称之为 shuffle。

在 Spark 中,基本上继承了该语义,并且更通用化了。一个常见的例子是 join,即将两个 Table 间具有相同 key 的记录路由到同一台机器上,从而在所有机器上按 key 分片进行并行 join,从而大幅提高效率。类似于 join 这样的高阶操作,会使得底层的 Partition 不能继续在本机运行而不与其他 Partition 发生联系,因此 shuffle 也是 Spark 中划分 Stage 的一个分水岭。

对于 MapReduce 系统来说,使用的 shuffle 策略类似于 Spark 中基于排序的 shuffle。Map 首先将中间结果写到内存中,然后定期刷盘,刷盘时进行归并排序。然后 Reducer 端按需拉取,从多个 Mapper 端拉取数据后,再次进行归并排序,然后传给 Reduce 函数。这样的好处在于可以进行大规模数据处理,因为可以做外部排序,也可以做迭代惰性加载。对于 Hadoop 的实现来说,将包含 shuffle 的整个流程分为了明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。

其他的一些点

一些缺点:通过 MapReduce 的系统设计可以看出,它是一个高吞吐,但是也高延迟的批量处理系统。并且不支持迭代。这也是后续 Spark,Flink 这样系统火热的动机。

文件系统: MapReduce 只有和 GFS 这样支持分块、多进程并发写的大文件系统配合才能发挥出更大的优势,优化输入和输出的性能。此外,这种分布式文件系统还会屏蔽底层节点故障。

组织形式: MapReduce 是一个系统,需要部署到集群上,但它同时又是一个库,让用户代码和分布式集群进行交互而不太用关心分布式环境中的问题的一个库。每个任务需要写任务描述(MapReduceSpecification),然后提交给系统——这是库常用的一种提交任务的手段。

代码分发:谷歌的 MapReduce 具体实现不得而知。猜测可以有两种方式:一是将 MapReduce 库代码 + 用户代码整体在不同机器上 fork ,然后根据角色不同来执行不同分支。二是将各个机器上的服务先启动起来,然后执行任务时只会将用户自定义函数序列化后传输到不同机器。

名词释义

Intermediate result:map 函数产生的中间结果,以键值对形式组织。

**Map Task **:这个应该都是指的 Worker 机器上,执行 map 函数的工作进程。

Map Worker:Map Task 所运行的 Worker 机器。所有 Worker 应该是没有角色标记的,既可以执行 Map Task,也可以执行 Reduce Task,以充分的利用机器性能。

参考文献

[1] Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters

[2] Alexey Grishchenko, Spark Architecture: Shuffle

[3] JerryLeadSpark internals


我是青藤木鸟,一个喜欢摄影、专注大规模数据系统的程序员,欢迎关注我的公众号:“木鸟杂记”,有更多的分布式系统、存储和数据库相关的文章,欢迎关注。 关注公众号后,回复“资料”可以获取我总结一份分布式数据库学习资料。 回复“优惠券”可以获取我的大规模数据系统付费专栏《系统日知录》的八折优惠券。

我们还有相关的分布式系统和数据库的群,可以添加我的微信号:qtmuniao,我拉你入群。加我时记得备注:“分布式系统群”。 另外,如果你不想加群,还有一个分布式系统和数据库的论坛(点这里),欢迎来玩耍。

wx-distributed-system-s.jpg