分布式系统,程序语言,算法设计

Kafka —— 弥合日志系统和消息队列的鸿沟

概览

Kafka (该论文发表于2011年6月[1])是日志处理和消息队列系统的集大成者。较低的延迟、极高的容量和吞吐,使其可以应用于在线服务和离线业务。为了兼顾性能和可扩展性,Kafka 做了一些看起来反直觉但是却很实用的设计。例行总结一下其设计特点:

  1. 面向存储的消息队列:意味在近实时的情况下能够将传统消息队列的存储增加几个数量级。实现原理是充分利用了磁盘的顺序写和操作系统自身的缓存;此外为了提高访盘、传输效率,使用了文件分段、段首索引、零拷贝和批量拉取等技术。

  2. 灵活的生产消费方式:总体而言是基于主题粒度的发布订阅式架构,并且既支持组内多消费者互斥消费,也支持不同消费者组间的重复消费。这里面涉及到消息队列的两个核心设计选择:pull 式消费以及客户端侧存储消费进度。拉式消费可能会导致空轮询以及稍微的延迟,好处在于灵活;客户端存储消费进度可以使的 broker 无状态,以进行灵活伸缩和容错。为了简化实现,消费时,每个分区最多为一个消费者所消费。

  3. Zookeeper 存储元信息:利用分布式一致性组件 Zookeeper 以注册表的形式存储系统元信息,包括 broker 和消费者的存活信息、消费者和分区间的对应关系、每个分区的消费进度等等。Zookeeper 作为一个前缀树形式组织 KV、支持发布订阅的高可用组件,可以满足 Kafka 进行消费协调和进度保存的协作需求。

  4. 分区级别的多副本设计:这一点在论文中还没实现,应该是后来系统开源演进时加上的。利用该条可以实现对 broker 的容错。

  5. 简洁强大的消费接口:Kafka 的客户端一般提供两层接口抽象。包括无需关注分区偏移量信息的高层(high-level)简单读写接口,以及可以灵活控制分区组织和消费进度的低层(low-level)接口。论文中只提到了前者,以表现其简洁。

作者:青藤木鸟 https://www.qtmuniao.com, 转载请注明出处

引言

任何互联网公司都会产生大量的”日志”数据,这些数据主要包括:

  1. 用户行为事件。如社交网站中登录、浏览、点击、喜欢、分享、评论等等。
  2. 系统运维数据。如某个服务的调用栈、调用延迟、错误报告以及一些机器运行指标:CPU、网络或者硬盘的使用率。

长期以来,这些数据只是用来做后台分析或者系统瓶颈定位。但现在的一个趋势是,这些数据被越来越多的用到线上业务,包括:

  1. 搜索相关性分析
  2. 数据驱动的推荐
  3. 广告精准投放
  4. 服务黑名单过滤
  5. 用户首页 Feed 流

等等,不胜枚举。这些行为数据通常比用户本身元数据的多几个数量级,因此其实时分析需求给数据系统带来了新的挑战。比如搜索、推荐和广告都需要细粒度的点击率的数据,这就要求不仅要统计所有的用户点击事件,还要去统计那些没有被点击的页面的数据。

早期的一些系统为了实现类似的需求都是直接去线上环境扒系统日志来进行分析,不过最近一些公司建了很多专有系统来干这件事。比如 Facebook 的 Scribe,Yahoo 的 Data Highway 和 Cloudera 的 Flume。这些系统的目标都在于将日志数据收集起来,然后导入像 Hadoop 等数据仓库来进行离线处理。然而在 LinkedIn,我们除了离线分析的需求,还有不少上述提到的要求延迟不高于数秒的实时处理需求。

我们构建了一个崭新的针对日志处理的消息系统,名为 Kafka。Kafka 兼顾了日志聚合需求和消息队列需求。一方面来说,Kafka 是一个支持平滑扩展,支持高吞吐的分布式系统;另一方面,Kafka 提供了类似于消息队列的 API,并且允许应用对日志消息进行实时消费。论文发表时,Kafka 已经在 LinkedIn 上线了六个多月,只用一个系统就满足了我们两大方面的需求,从而极大简化了我们的基础设施。

接下来,论文在第二部分会再次回顾消息队列系统(messaging system)和日志聚合系统(logging aggregators)的传统形态。在第三部分,我们首先介绍 Kafka 的基本架构,继而讨论它设计的基本原则。然后在第四部分,我们将看一下 Kafka 在 LinkedIn 的部署情况和性能指标。

相关系统

传统的企业级消息系统(如activemq, IBM Websphere MQ, Oracle Enterprise Messaging Service,TIBCO Enterprise Message Service)已经存在很长时间了,主要作用是消息总线异步解耦。但它们并不能无缝适配日志处理需求,主要有以下几点原因:

  • 语义侧重点不同

传统消息队列侧重于提供灵活的消息送达保证,比如多个队列的事务问题、消息送达的 ACK 确认、消息的严格保序等等。这些功能在日志处理系统中需求并不是那么高,但是他们大大增加了 API 复杂性和系统实现的难度。

  • 高吞吐支持差

大部分传统的消息队列都不将高吞吐作为第一设计目标。比如 JMS 连 batch 接口都没有,因此每发一个消息都会使用一个新的 TCP 连接,显然不能满足我们日志系统高吞吐的需求。

  • 不支持分布式存储

这些传统消息系统通常不容易进行切片(partition)以存储到多台机器上。因此在数据量大时,不能支持平滑扩容。

  • 面向实时而非累积

这些消息系统的另一个特点是假设消费类型是近实时消费(near immediate),因而未被消费的消息的量总是和。一旦消息产生累积,这小消息系统的性能将大大下降。因此他们难以支持离线消费和大批量消费的任务类型。说白了,传统的消息系统的设计思路并不面向存储

近些年也涌现了一些专用的日志聚合系统。

如 Facebook 的 Scribe。系统产生的日志通过 socket 写入远程的 Scribe 机器,每个 Scribe 机器将收集到的日志定期刷(dump)到 HDFS 或 NFS 机器集群中。

又如 Yahoo,其数据高速公路(data highway)也是类似的数据流模式。一群机器将从客户端收集来的日志按分钟聚集成一个个文件,然后将其写入 HDFS 中。

再如 Cloudera,他们构建了一个比较新颖的日志聚合系统:Flume。Flume 提供了扩展语义的管道(“pipes” ),和汇聚槽(sinks),可以让用户灵活的对日志流进行消费。此外,该系统还引入更多的分布式特性。

但是,大部分的这些系统都是面向离线消费的,并且暴露太多不必要的实现细节(也就是没有抽象好,不够灵活,比如 Yahoo 还暴露了分钟文件 minutes file 这种东西)。此外,这些系统基本采用”推”(push)的模式,即 broker 将消息推送给消费者(consumers)。

在 LinkedIn,经研究发现,”拉” (pull) 的模式更适合我们的业务场景。在该模式下,每个消费者可以按照自己的喜好来决定消费速度,而不用担心被快的消费者淹没,或者被慢的消费者拖累[2]。”拉”模式也很容易实现消费重试(rewind),稍后我们会详细讨论这个问题。

最近,Yahoo 研究院开发了一个叫做 HedWig 的支持发布/订阅的分布式系统,它易于扩展,高可用,并且支持消息的持久化。然而,该系统更多的作为一个日志存储系统而存在。

Kafka 架构和设计原则

概念体系

由于上述系统的诸多限制,我们开发了一个基于消息的日志聚合系统——Kafka。首先介绍一些 Kafka 的概念体系。主题topic) 定义了某种消息(message)流的类型,生产者producer)会将消息发布到某个主题下,这些被发布的消息会暂时屯在一组叫做代理商broker)的服务器中。一个消费者consumer)可以从代理商那同时订阅一到多个主题,然后以拉取的方式进行消费。

接口设计

消息系统应该是很简单的,为了表达这种简单,我们将 Kafka 的接口(API)设计的很简约。为了避免枯燥的描述这些 API,我们用两个很简单的小例子来说明 Kafka 的 API 长啥样。

1
2
3
4
5
// Sample producer code:
producer = new Producer(…);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);

如代码所示,一条消息的格式很简单,就是一组字节,用户可以根据喜好来对数据进行序列化(即将对象实例编码成一组字节)。为了提高效率,可以一次发送一组消息。

1
2
3
4
5
6
// Sample consumer code:
streams[] = Consumer.createMessageStreams(“topic1”, 1)
for (message : streams[0]) {
bytes = message.payload();
// do something with the bytes
}

消费者通过创建一个或多个流来订阅 topic,被发布到相应 topic 的消息会渐次进入这些消费者创建的订阅流。接口就这么简单,至于 Kafka 如何来实现,卖个关子,稍后讨论。在语言层面,我们将每个消息流抽象为一个迭代器(Iterator), 消费者利用该迭代器取出一条条消息的消息体以进行处理。和一般的迭代器不同,我们的迭代器永不停止,当无新消息到来时,迭代器就会一直阻塞。我们同时支持两种消费模式,既可以一组消费者对某个 topic 进行互斥消费,也可以每个消费者对同一个主题进行独立消费。

架构图

Kafka 的架构图如下:

kafka-architecture.png

Kafka 是分布式系统,因此一个 Kafka 集群中会包含多个 broker 机器。为了均摊负载,每个 topic 被切分成多个分片(Partition),每个 broker 机器持有其中的一个或多个分片。多个生产者和消费者可以同时进行消息的生产和消费。在 3.1 节,我们会介绍 broker 上的单个分片的布局,讨论为了使单个分片高效的被消费的一些设计上的推敲和选择。3.2 节,会描述生产者和消费者如何在分布式环境中与多个 broker 进行交互。最后在 3.3 节,会讨论 Kafka 的数据交付保证。

单个分区的效率

我们做了一系列的设计上的决策来保证系统的高效性。

极简的存储设计。一个 topic 的每个分区就是逻辑上的一段日志。具体到物理上,为了防止分区文件过大,我们会将其进一步分成数据段(segment)。每次数据往最新的数据段中写,写到设定容量(比如说 1G)后,就会新建一个段文件继续写。此外,为了提高写入性能,我们会将日志记录在内存中进行缓存,只有日志数量达到设定值或者缓存数据的大小达到设定值时,才会将数据刷到外存中。为了保证可靠性,只有数据刷到了外存后,才会将其暴露给消费者。

和传统的消息系统不同,Kafka 存储的每条消息没有显式的消息 ID,而仅通过该条消息在分片中的偏移量(offset)来定位。这样我们省却了为了随机查找而建立的索引的额外开销。值得一提的是,我们的偏移量并不是连续的,而是类似于 TCP 中的 SEQ 的字节 offset——为了计算下一条消息的偏移量,我们需要将当前消息偏移量加上当前消息长度。下面可能会 ID 与偏移量混用,但是说的都是一个东西。

每个消费者总是顺序的去消费每个分区的数据,如果消费者每确认(ack)一个偏移量,就意味着该偏移量前面的所有消息都被消费过了。实现上来说,消费者端的库代码会向 broker 发一系列请求来拉取数据到消费者的缓冲区中供应用代码来消费[3]。每个拉取请求包含了起始便宜地址和可以接受的字节尺寸。每个 Broker 在内存中维护了数据段首偏移量到数据段物理地址的映射(应该是用查找树组织的,因为需要范围定位)。当一个读取请求到来时,broker 根据请求偏移量来定位到相应的段,然后根据请求尺寸来读出指定的数据量,然后返回给消费者。消费者收到消息后,计算出下一条消息的偏移量,以进行下一次拉取请求。Kafka 中硬盘中日志和内存中索引的布局如下图(每个框框中数据即表示某条消息的偏移量):

kafka-log.jpg

高效的传输优化。由于网络传输开销会比较高,因此我们小心的设计了 Kafka 和外界数据交互的流程。如前所述,对于生产者,我们在 API 层面允许一次发送一批消息。对于消费者,虽然在 API 层面看起来是逐条消息进行消费,但在底层也是会批量拉取,比如每次都一次拉取数百 KB。

另一个与众不同的设计决策在于,我们不在 Kafka 系统层面进行显式的消息缓存。也就是说,我们仅仅利用文件系统层面的页缓存(page cache)来实现加速硬盘读写的目的。这样做好处有二:

  1. 避免消息的多次缓存
  2. broker 进程重启后缓存不丢失

由于不在 Kafka 层面做缓存,内存层面的垃圾回收策略就可以做的很简单。因此,可以简化使用自带 VM 的编程语言进行系统实现的难度。

在 Kafka 的应对的场景中,生产者和消费者都是顺序的访问段文件,并且消费者通常只是稍落后生产者。操作系统默认的写穿透(write-through)和预读取(read-ahead)等启发式的缓存策略天然适配该场景。我们发现不论消费者还是生产者的读写速率都是随着数据尺寸线性增长,直到数 TB 级别(继续往后论文就没说了)。

此外,我们还优化了消费者远程数据访问过程。因为 Kafka 是一个支持多次订阅的系统,一条消息可能被不同的消费者消费多次,因此远程数据访问的优化能够极大提升系统性能。传统上,一条数据从本地文件送到 socket 上通常包含以下几个过程:

  1. 从外存中读入数据到操作系统的页缓存(page cache)。
  2. 从页缓存拷贝数据到应用缓冲区(application buffer)中。
  3. 从应用缓冲区拷贝到内核缓冲区(kernel buffer)。
  4. 从内核缓冲区拷贝到 socket。

这些过程涉及四次数据拷贝和两次系统调用,可以说非常冗余浪费。在 Linux 和其他一些操作系统中,存在一个 sendfile (zero copy,零拷贝技术)的 API,能够直接将数据从文件传送到 socket 中。利用此 API,可以省去步骤(2)(3)中引入的两次数据拷贝和一次系统调用,由此使得 Kafka 可以将数据从 broker 的段文件中高效的传输给消费者。

无状态的 Broker。与其他消息队列不同,在 Kafka 中,broker 不负责保存每个消费者的消费进度。也就说是,每个消费者需要自己保存自己的消费偏移量等信息,从而使 broker 的设计可以相对简化,不用维护过多状态。但如此一来,由于 broker 不知道所有订阅者的消费进度,就难以决定何时对某条消息进行删除。Kafka 使用了一个看似 tricky 的策略——按时间窗口对消息进行保存。比如说,只保存最近七天的数据。当然,每个 topic 可以设置不同的策略。这个简单的策略大部分情况下都很够用,即使是离线消费者也通常会每天,每小时甚至近实时进行消费,七天足以。Kafka 并不会随着数据量增大而显著降低性能,这个保证是允许 Kafka 使用如此简单的策略的关键所在。

这种大量存储+拉取的设计带来的另外一个重要的好处是——消费者可以主动选择进行回退(rewind)消费。这个需求看起来违背了通常消息队列的定义,然而在很多情况下却非常有必要。随便举两个例子:

  1. 当消费者进程由于错误而挂掉后,可以在恢复后有选择的对挂掉前后的数据重新消费。这对将 ETL 数据导入 Hadoop 等数据仓库之类的场景非常重要。
  2. 消费者会定期的将拉取的数据刷到持久化的存储中(比如倒排索引系统中)。如果消费者宕机,那部分已经从 消息系统拉取但是未持久化的数据就会被丢失。但是对于 Kafka 来说,消费者只需要记住 flush 到的 offset 即可,下次重启后再从该 offset 后开始拉取。但是对于传统没有大量缓存的消息队列来说,可能这部分数据就永远的丢了,或者得在消费端做某种错误备份和恢复的复杂策略。

多机协调

下面我们来讨论多个生产者和消费者在分布式环境中的行为。对于生产者,其发送数据时,可以将其随机发送到一个分区所在 broker;也可以根据 Key 以及作用于 Key 上的路由函数,将其发送到某特定分区机器(broker)上。对于消费者,行为稍复杂,接下来将会详细说明。

Kafka 有个概念叫做消费者组(consumer groups)。同一个消费者组中包含多个消费者,这些消费者会互斥的消费一组 topic,即,对于一条消息,仅会被同组消费者中的一个所消费。不同的消费者组会进行独立消费,即每个消费者组维护自己的消费进度,不需要进行协同。一个消费者组内的每个消费者可以分属不同进程甚至不同机器,我们目标是在不引入过多额外开销的情况下将消息均匀的分发到每个消费者。

第一个决策是将每个分片作为最小的并行粒度。即每个分区最多为一个消费者所消费,如果我们允许多个消费者消费同一个分区,势必会引入锁之类的协调机制并且记录下一些状态以跟踪每个消费者的消费状态,这会加大实现难度。而在我们的设计中,只有在消费者数量变动,需要重新平衡流量的时候才需要协调。为了能使每个消费者流量更均衡,建议是让分区个数远大于消费者个数,这点很容易实现,只需要给 topic 配置更多分区即可。

第二个决策是不引入中心的主节点,代之以让所有消费者以去中心化的形式进行协调。如果使用中心节点,我们还得去关心其容错问题,又引入了不必要的复杂度。为了让消费者更好的进行协调,我们引入了一个高可用的一致性服务——Zookeeper。Zookeeper 的 API 很像文件系统,是以前缀树的形式组织的 KV (K是路径,以 ‘/‘ 来区分层次,V 可以是任何可序列化的值)存储。该 API 支持创建一个路径、给一个路径设置值、读取路径的值、删除一个路径、列出某个路径下所有子节点的值。此外,Zookeeper 还具有以下特性:

  1. 客户端可以向某个路径注册一个回调函数,以监听该路径的值或其孩子节点的变动。
  2. 路径可以被创建为易失的(ephemeral),即当所有该路径的客户端消失后,该路径及值会被自动的移除。
  3. Zookeeper 使用一致性协议将其数据进行多机备份,使其服务具有高可靠性和高可用性。

Kafka 使用 Zookeeper 干了以下几件事情:

  1. 监控 brokers 和消费者的增删。
  2. 当出现 brokers 或者消费者的增删时,启动消费再平衡任务。
  3. 维护消费者的间关系状态,跟踪每个分区的消费偏移量。

具体来说,当一个 broker 或消费者启动时,它会将元信息存在 Zookeeper 中的注册表(registry)中。

  1. broker 的注册表包括 broker 的主机名和端口号、以及存于其上的 topics 和分区。
  2. 消费者的注册表包括其所属的消费者组以及订阅的 topic。

每个消费者组都在 Zookeeper 中有一个相关联的所有权注册表和偏移量注册表。

  1. 我们将消费者消费某个分区的行为称为占有,所有权注册表(ownership registry)即记录了消费者与其占有的分区间的对应关系。其中,路径名标识一个分区,记录值是该分区的拥有者。
  2. 偏移量记录表记录了该消费者组所有订阅的 topic 对应的每个分区的消费进度(即偏移量)。

Zookeeper 中 broker 的注册表、消费者的注册表和拥有关系的注册表是易失的,而偏移量注册表是永久的(persistent)。当一个 broker 死掉时,其上所有分区会自动从 broker 注册表中删除。当一个消费者死掉时,其在消费者注册表的条目会被删除,在拥有关系的注册表中所拥有的分区关系条目也会被删除。每个消费者都会监听 broker 注册表和消费者注册表,当有 broker 变动或者消费者组中成员变动的时候,就会接收到通知。

当某个消费者加入或者消费者组中有成员变化时,该消费者就会启动一个再平衡(re-balance)的进程以决定他需要消费哪个分区集。伪码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Algorithm 1: rebalance process for consumer Ci in group G
For each topic T that Ci subscribes to {
remove partitions owned by Ci from the ownership registry
read the broker and the consumer registries from Zookeeper
compute PT = partitions available in all brokers under topic T
compute CT = all consumers in G that subscribe to topic T
sort PT and CT
let j be the index position of Ci in CT and let N = |PT|/|CT|
assign partitions from j*N to (j+1)*N - 1 in PT to consumer Ci
for each assigned partition p {
set the owner of p to Ci in the ownership registry
let Op = the offset of partition p stored in the offset registry
invoke a thread to pull data in partition p from offset Op
}
}

算法如上,简单来说就是该消费者去 Zookeeper 拿到所有 topic。对于每个 topic ,拿到分区集合 set(Pt) 以及所属消费者组的消费者集合 Ct。然后将分区集尽量等分为 |Ct| 块,该消费者按照一个确定性算法挑去其中一块,比如说按某种特定方式对 Ct 和 Pt 进行排序。之后,每个消费者对于每个属于自己分区启动一个线程进行拉取,并且从偏移量注册表中保存的偏移量开始消费。随着分区中的数据被不断的消费,消费者会不断的在注册表中更新偏移量。

当消费者或者 broker 出现变动时,同一个消费者组中的所有消费都会收到通知,由于网络等原因,每个消费者收到通知的时间会有先后关系。当先收到通知的消费者在运行上述算法去拿新的分区的数据时,很可能发现该分区还被其他消费者占有。对于这种情况,我们采用一个很简单的策略:该消费者将自己占有的分区释放掉,并且等一小会,然后进行重试。在实际运行中,一般再平衡程序在几次重试后就能达到稳定。

当一个新的消费者组创建时,注册表中没有任何的偏移量记录。这时,使用 broker 提供的 API,该消费者组可以针对每个分区选择从最小的偏移量或者最大的偏移量进行消费(这取决于消费者组的配置)。

数据交付保证

原则上,Kafka 仅提供”至少一次”(at-least-once)[4] 的交付语义。恰好一次(exactly-once)交付语义可以通过两阶段提交来保证,但是在我们应用场景中,这并不是必须的。其实大部分时候,在网络状况良好的一般机房中,大部分消息都都会被消费者组恰好消费一次,仅在消费者进程异常退出没有做正常的清理工作时(比如没有将最后消费到的 offset 更新到 Zookeeper),那么新的消费者在启动时,就会重复的消费那部分 offset 没有提交的数据。如果应用不能够容忍这种情况,就必须在应用逻辑中增加消息去重的逻辑,可以用一个字典来存储最近消费过的数据的 id 进行去重,该 id 可以是 Kafka 中给 message 的 offset,也可以是用户自定义的和消息一一对应的某个 key。这种方法的性能要好于在 Kafka 层面使用两阶段提交的方法来保证恰好一次的语义。

Kafka 保证来自于同一个分区的消息是保序的,即 offset 大小顺序,但是不同分区之间的顺序是不保证的。为了避免数据出错,Kafka 在每个消息中保存了一个 CRC 校验和。当 broker 遇到 IO 问题时,在恢复时,可以把 CRC 校验不一致的消息给删掉。由于 CRC 保存在消息中,生产和消费的环节都可以检查一下 CRC 来规避网络传输带来的错误。

当一个 broker 宕机时,其上面所有消息将会变为不可用。进一步,如果 broker 的存储系统完全坏掉,其上面的未消费消息将永远丢失。将来,我们计划提供内置的多机冗余备份,以容忍单个 broker 节点偶然出现问题(当然现在2019也早已经实现了)。

LinkedIn 中 Kafka 的使用

在本节,简要说明一下 LinkedIn 中是如何使用 Kafka 的。下图是一个我们的简化部署图:

kafka-deployment.jpg

我们在每个数据中心部署了一套服务于用户业务的 Kafka 集群,前端业务将产生的各种日志数据批量发送到 Kafka 集群中。我们使用硬件(load balancer)将流量尽量均匀的分发到各个 broker 上去。为了减少网络开销,我们将在线消费者业务和 Kafka 部署在一个物理集群中。

我们还在靠近 Hadoop 集群等其他数据仓库基件的另一个数据中心部署了一套负责离线数据分析的 Kafka 集群。一方面,该Kafka 集群中内置了一组消费者进程,会定期的去从在线 Kafka 集群拉取数据,写入本集群中。另一方面,该集群运行着数据加载作业,定期地从 Kafka 集群中拉取数据,处理后载入 Hadoop 集群和数据仓库中以进行汇总和分析工作。我们还将此集群用来进行原型建模以及一些即时查询分析工作。在不用特别调优的情况下,端到端大概能有 10s 左右的平均延迟,这对我们的需求来说够用了。

当前,我们的 Kafka 集群中每天会产生数以亿计的日志消息,总量大约在数百G字节。一旦我们将现有系统全部转向 Kafka,可以预见到,Kafka 中的数据量级将会迎来一个更显著的增长,并且需要适配更多的数据类型。当运维人员由于软硬件原因将 broker 停机时,再平衡(re-balance)进程能够自动的将消费在多个 broker 中进行重新平衡。

我们还有一套审计系统来检查整个流水线中是否有数据丢失。具体来说,对于每条消息,在生产时会被打上时间戳和生产者主机名的标记;对于数据生产的元信息,即特定的时间窗口内产生的消息个数事件,会定期的被提交到另外的用于监控的 topic 上。于是消费者就可以利用每条消息中的额外信息统计特定时间窗口内该 topic 下收到的消息数量,与监控 topic 中读取的监控消息作比对,以确定是否进行了正确的消费。

我们给 Hadoop 定制了一种 Kafka 的输入格式[5],使得 MapReduce 任务能够以 Kafka 作为数据来源。MapReduce 任务将从 Kafka 中读出的原始数据进行分类聚集、适当压缩等操作,以备将来对这些数据进行高效的处理。MapReduce 任务要求对 Kafka 的消费任务是幂等的,而 Kafka broker 的无状态以及让消费侧[6]存储偏移量等特点,让我们可以在 Map 任务失败重启时,从上一次的消费结束处继续消费,从而做到消息消费的不重不漏。 当任务完成时,数据和偏移量都被存储在了 HDFS 上。

我们使用 Avro 作为序列化框架[7],它效率较高且支持类型推导。对于每条信息,我们将消息数据类型对应的模式标识(schema id)以及序列化过后的字节作为 Kafka 的消息净核一起发送。这种模式可以让我们很灵活的对同一个消息主题使用多种消息类。消费者在收到消息时,根据模式标识来获取对应的 Avro 实际编码类型,以将实际数据解码成具体的对象实例。这个转换过程很简单,因为对于每个对象类型,只需要查找一次。

译注

[1] Kafka 的论文发表于 2011 年,因此文中的好多现状都是针对当时来说的,到现在(2019年末)消息队列的情况肯定又不一样。

[2] 在推的模式就有这个问题,如果两个消费者(比如 A 快,B 慢)消费速度差太多,Broker 必然要维护 A 消费完但是 B 还没有消费的那些消息。由于传统消息队列缓存都不太大(因为一般要存内存里),必然很快要达到上限,要么系统爆掉,要么限制快的消费者。但拉的问题在于要保证实时性就得不断地轮询,推拉问题也是一个经典的 tradeoff 问题了。

[3] 这里的设计明显借鉴了 TCP 的思想,可以说是在应用层实现了保序确认滑动窗口缓冲区的设计。

[4] 一般来说,数据在交付时,由于系统意外宕机、网络抖动等问题,会出现数据条目丢失的情况。在这个情况下,如果我们不进行介入,那么所有就是提供至多一次的语义(at-most-once)。如果我们对丢失的数据条目进行重试,就有可能造成多次交付的情况,因为发送端无法确定接收端是在接收到数据后网络出了问题,还是接受前出了问题,无脑重试的话,就有可能造成同一条数据的多次处理,这种情况下我们提供的是至少一次(at-least-once)的交付语义。如果想强行实现恰好一次(exactly-once)的交付语义也不是不可以,比如使用两阶段提交等一致性方法保证数据消费和偏移量更新的原子性,以提供恰好消费一次的语义。但是这样仍有可能出问题,并且使系统复杂度变高,时间耗费也较多。所以一般如果对丢少量数据不敏感,用 at-most-once 就够了,如果敏感,可以用 at-least-once 并在应用层去重。

[5] 输入格式,input format,是 Haddop MapReduce 适配不同数据源的一个接口,相当于一个转换层。

[6] 可以依赖 Zookeeper,也可以依赖 Hadoop 对 offset 进行持久化,结合上下文感觉这里说的是后者。

[7] 这种序列化方式在当时应该是个用户侧的选择而非 Kafka 框架所提供的功能。