Lazy loaded image
论文笔记《Kafka: a Distributed Messaging System for Log Processing》
Words 4620Read Time 12 min
2025-2-16
2025-2-19
type
status
date
slug
summary
tags
category
icon
password
在学习 MIT 6.824 (6.5840) 的时候提到了消息队列,加上 Kafka 是后端开发领域的重要组件,就想着把 Kafka 的论文顺道看一看。需要注意的是,这篇论文是 2011 年发表的,和最新的 Kafka 实现可能会有一些不一样的地方。

01 | 总览


We introduce Kafka, a distributed messaging system that we developed for collecting and delivering high volumes of log data with low latency.
—— from Chapter “ABSTRACT”
On the one hand, Kafka is distributed and scalable, and offers high throughput. On the other hand, Kafka provides an API similar to a messaging system and allows applications to consume log events in real time.
—— from Chapter “1. Introduction”
论文开宗明义,一开始就提出了 Kafka 的设计目标:为了同时满足在线和离线服务的需要,设计了具有高吞吐和低延迟分布式和可伸缩简洁高效的接口等优点的消息系统——Kafka。
为了实现目标,Kafka 选择了一些在当时并不主流的设计思路:
  1. 简化的消息存储方式。为了能够给实时系统提供消息读写的支持,Kafka 通过消息分段存储、磁盘顺序读写而避免随机读写、取消消息 id 设计等方式,提高消息存储的效率。
  1. 高效的网络传输效率。利用批量读写、零拷贝、页缓存等技术,提高网络传输速率。提出了分区和消费者组的概念,进一步提高了消息读写的并行度,
  1. 轻量化的分布式协作。Kafka 中的 broker 是追求轻量化设计的,一些必要的状态都交给了消费者自身和 Zookeeper 去维护。同时,为了避免过重的分布式协调开销,放弃了中心节点的设计,而是利用 Zookeeper 和确定性算法,以去中心化的方式让消费者自己去互相协调。
  1. 简洁的 API 接口。Kafka 的接口设计尽可能简洁,调用时只需要提供必要的信息,而且不会暴露过多的系统内部信息。

02 | 整体架构


论文首先介绍了 Kafka 中的几个基本概念:
  • Topic(主题):每个 topic 代表一类消息流。通俗来讲就是不同的消息类型或是消费场景,可以根据实际业务进行自定义,比如用户行为日志、订单交易记录等。每个 topic 之间是互不干扰的。
  • Producer(生产者):一个 producer 负责将消息发布到一个 topic 名下。
  • Broker(代理):一个 broker 其实就是 Kafka 中的一个服务器,多个 broker 就组成了 Kafka 集群。发布的消息会被实际保存在 broker 中。
  • Consumer(消费者):一个 consumer 会订阅一个或多个 topic,然后从保存这些 topic 消息的 broker 中拉取消息进行消费。
论文认为消息系统在概念上是很简单的,「生产、暂存、消费」三个步骤就组成了消息系统的主要功能,为了和这种简单美形成呼应,Kafka 中的 API 接口也被设计的尽可能简洁。文中没有花费篇幅去描述 API 的精确定义,而是给出了两个使用 API 的伪代码示例。
生产者伪代码:用户可以自己选择序列化方式将消息编码成字节的形式,Kafka 不在意消息的现实意义,在它眼中只是一段字节而已。为了提高传输效率,生产者可以将多条消息放在一个请求中发送。
消费者伪代码:订阅一个 topic 后,消费者会创建一个或多个消息流来获取 topic 中的消息。每个消息流会提供一个迭代器,通过这个迭代器,消费者可以源源不断的获取最新发布的消息。当没有新消息产生时,这个迭代器不会终止,而是会阻塞等待新消息的到来。
Kafka 的整体架构如下图所示。整个系统通常是由多个 broker 组成的集群。为了负载均衡,topic 中的消息在实际存储时会被分到多个 partition(分区)中,每个 broker 会保存一个或多个分区,这么一来,就可以增加生产者和消费者处理消息的并发度,也可以将流量分摊到不同的 broker 上。生产者在发布同一个 topic 下的多个消息时,可以通过某种规则,把这些消息分别发送给不同的分区,这样就可以并行的生产消息了。消费者也是同理,订阅一个 topic 后,可以从这个 topic 下的多个 partition 中并行的获取消息。
图 1:Kafka 结构图
图 1:Kafka 结构图

03 | 设计理念


论文之前一直在强调 Kafka 的低延迟和高吞吐、分布式和可伸缩等优点,那么这些优点具体是怎么实现的呢?这就是论文接下来要介绍的。

单个分区的效率优化

消息在拉取和存储时,最终都是和分区做交互,所以提高单个分区的读写效率是非常重要的。Kafka主要在消息存储、网络传输以及 broker 状态三个方面做了优化。

1)简化消息存储方式

每个分区在逻辑上被当做一个日志,分区中的消息就存在这个日志中。在具体实现时,消息是存储在文件中的,新的消息会被顺序写入文件的末尾,当这个文件的大小超过设定阈值时(如 1GB),就会创建一个新的文件保存后续的消息,所以每个分区其实是由很多个大小近似的段文件组成的。同时,为了提高写入性能,Kafka 不会立即把改动的文件刷到磁盘里,而是在写入的消息数量达到设定的数量,或是距离上次刷盘超过一定的时间后,才会刷盘。另外,消息只有刷盘后才会被暴露给消费者。
Kafka 没有给消息设定唯一的 id,而是通过消息在整个分区日志中的 offset(偏移量)来进行寻址。具体实现如图 2 所示,每个小方框表示某条消息的 offset,Kafka 在内存中只需要维护每个段文件中第一个消息的 offset 即可。这种设计简化了寻址方式,Kafka 根据要拉取消息的 offset 就可以直接计算出它在文件中的偏移地址。如果引入消息 id 的话,就需要额外维护每个 id 到文件位置的映射关系,徒增复杂度。
因为消费者是顺序消费分区中的消息的,所以 Kafka 采用了类似于 TCP 的消息确认机制。只要消费者确认了一个 offset,就说明这个 offset 及其之前的所有消息都已经被消费了。当消费者拉取消息的时候,只需要提供起始 offset 和最大可以拉取的字节大小,Kafka 就可以根据这个 offset 定位到段文件的起始位置,然后批量读出指定大小以内的消息返回给消费者。
图 2:Kafka 日志
图 2:Kafka 日志

2)提升网络传输效率

网络传输通常也是系统执行效率的瓶颈之一,所以 Kafka 对网络传输也做了优化。
第一个优化点,是允许批量传输消息。生产者可以在一个请求中提交多个消息,消费者则可以在一个请求中拉取多个消息。
第二个优化点,是不在应用层面做消息缓存,而是依赖于底层文件系统的页缓存(page cache)。这么做有三个好处:
  1. 避免多次缓存同一个消息。
  1. 当 Kafka 的进程重启的时候,页缓存是还在的,不会丢失。
  1. 可以减轻编程语言的垃圾回收压力。
那么这么做会导致效率降低吗?论文的答案是通常不会。生产者和消费者都是顺序访问段文件的,而且两者的进度通常不会相差太大,所以非常适配文件系统的页缓存策略(write-through 缓存策略和 read-ahead 预加载机制)。当生产者写入新的消息后,在页缓存失效前,消费者很可能就已经把它读出去了。
第三个优化点,是在给消费者传输消息时,利用零拷贝技术加快将数据从磁盘读取到 socket 的速率。将数据从磁盘读取到 socket 通常包含 4 个步骤:
  1. 把数据从磁盘读取到操作系统的页缓存。
  1. 把数据从页缓存拷贝到应用缓冲区。
  1. 把数据从应用缓冲区拷贝到另一个内核缓冲区。
  1. 把内核缓冲区中的数据拷贝到 socket。
整个过程包括了 4 次数据拷贝和 2 次系统调用,非常冗余。使用零拷贝后就可以省去步骤 2 和 3,减少 2 次数据拷贝和 1 次系统调用,可以说效率提高了一倍。

3)设计无状态的 broker

为了减轻 broker 的负担和复杂度,broker 不会保存每个消费者的消费进度,而是由消费者自己维护。
这个设计会带来一个问题:既然 broker 不知道每个消费者的消费进度,那么就无法判断某条消息是不是可以被删除。对此,Kafka 的解决办法非常简单:每条消息只保存一定的时间(通常是 7 天),超过这个时间后就直接删除。所以,我们要注意保证消费进度不能落后生产进度太多,遇到消息积压的时候要及时处理。
这样的设计也会带来一个好处,就是消费者可以回退到之前的 offset,然后重新进行消费。

分布式协作优化

介绍完单个分区的效率优化后,论文分别从生产者和消费者的角度,介绍了如何在分布式的环境下进行多机协作。

1)生产者角度

生产者的部分比较简单。生产者在发送消息的时候,可以把消息随机的发送到一个分区中,也可以通过作用在 key 上的分区函数,发送到指定的分区上。

2)消费者角度

在消费者的部分,Kafka 引入了消费者组(consumer groups)的概念。每个消费者组包含一个或多个消费者,一个 topic 中的消息只能被同个消费者组中的一个消费者所消费,但是不同的消费者组之间是互相独立的。同个消费者组中的消费者可以来自不同的线程、进程或是机器。引入消费者组后,结合后续的设计决策,可以尽可能的将消息均匀的交给组中的所有消费者处理,同时减少消费者之间的协调开销。
第一个决策点,是将 topic 的分区作为并行的最小单元。之前提到一个 topic 中的消息只能被同个消费者组中的一个消费者所消费,那么怎么实现这个保证呢?就是让分区只能被同个消费者组中的一个消费者所消费。如果多个消费者可以消费同一个分区的话,那么在消费者取消息的时候,就需要引入锁之类的同步机制来保证消息只被一个消费者取走,而且每个消费者的消费速度不同,可能先取走的消息反而后消费,没法保证消息被消费的顺序。所以这个决策还隐含以下两个注意点:
  1. 同个分区中的消息是按顺序消费的。
  1. 不同分区中的消息的消费顺序是无法保证的。
第二个决策点,是不引入中心节点,而是以去中心化的方式,让消费者自己去互相协调。如果引入中心节点的话,需要考虑中心节点的容错问题,增加系统的复杂度。但是为了协调,Kafka 引入了高可用的一致性服务 Zookeeper,Kafka 用它来实现以下 3 个任务:
  1. 监测 broker 和消费者的增删事件。
  1. 当上面的事件发生时,通知消费者执行负载均衡算法。
  1. 维护每个分区的消费关系和消费偏移量。
具体来说,Kafka 在 Zookeeper 中维护了 4 类信息,分别保存在 4 个注册表中:
注册表
功能
保存的信息
是否持久保存
broker registry
当一个 broker 启动时,它会把自身的元信息保存在 broker registry 中。
1. broker 的主机名和端口号。 2. broker 负责存储的 topic 分区。
易失的
consumer registry
当一个消费者启动时,它会把自身的元信息保存在 consumer registry 中。
1. 消费者所属的消费者组。 2. 消费者订阅的所有 topic。
易失的
ownership registry
每个消费者组都会有一个属于自己的 ownership registry。
1. 组中消费者订阅的所有分区。 2. 每个分区对应的消费者。
易失的
offset registry
每个消费者组都会有一个属于自己的 offset registry。
1. 组中消费者订阅的所有分区。 2. 每个分区的消费进度。
持久的
当一个 broker 失效时,会在 broker registry 中删除它所负责的分区的信息。当一个消费者失效时,会在 consumer registry 中删除这个消费者的条目,同时在 ownership registry 中删除这个消费者负责消费的分区的信息。注意,offset registry 中的分区的信息是不会被删除的,分区对应的消费者发生变动时,新的消费者可以根据这个分区中保存的消费进度继续消费。
每个消费者还会监听 broker registry 和 consumer registry,一旦其中有成员变动,就会收到 Zookeeper 的通知,开始执行负载均衡算法,这个算法会重新决定每个消费者负责消费的分区。算法过程如下所示:
简单来说,当一个消费者执行这个算法时,对该消费者订阅的一个 topic,从 Zookeeper 中找到这个 topic 的所有分区并排序,找到消费者组中所有订阅这个 topic 的消费者并排序,然后把这些分区根据这些消费者的数量等分,消费者按照排序后的顺序认领自己所负责消费的分区集合,最后更新 Zookeeper 中的注册表信息。
当然,所有消费者不可能同时执行这个算法,先执行的消费者可能会发现新负责的分区还被其他消费者占有,这种情况下,该消费者只需要释放掉自己占有的分区,然后过段时间重试即可。

消息投递保证

Kafka 只提供了 at-least-once 的投递保证。如果消费者进程崩溃前没有及时向 Kafka 确认已消费的消息,那么新的消费者接手这个分区时是会获取到已经消费过的消息的。如果需要 exactly-once 的保证,就需要应用自己去实现去重逻辑或保证消息消费的幂等性。
Kafka 保证同一分区的消息是按 offset 的顺序投递给消费者的,但是不同分区之间的消息无法保证。
Kafka 在每个消息中存储了 CRC 校验和,防止消息投递时发生数据错误。
在论文发表的时候,消息是没有容错机制的,也就是说如果一个 broker 不可用了,那么它上面储存的消息就丢失了。论文也提到未来会增加冗余备份机制来避免这种情况。

04 | 写在最后

在文章开头有提到,这篇论文距今已经有十多年了,现在最新的 Kafka 版本肯定和论文描述的有不一样的地方。比如,从 Kafka 2.8 开始,Kafka 开始逐步去除对 Zookeeper 的依赖,转而使用内部集成的 Kraft 来实现之前 Zookeeper 负责的功能。再比如,最新版本提供了生产者的 exactly-once 保证,可以通过设置项开启,但是这个保证也是有局限性的,只能实现单个会话的单个分区中的消息生产保证。所以,在使用 Kafka 的时候,还是要以对应版本的文档为准。但是,我们仍可以通过这篇论文理解和学习 Kafka 设计时的一些技术选择和理念,这些理念也可以在未来我们优化系统时提供一些思路。
 
上一篇
ConcurrentHashMap(Java 8)源码分析
下一篇
MIT 6.5840 (6.824) Lab 4B 的实现

Comments
Loading...