Lazy loaded image
MIT 6.5840 (6.824) Lab 4B 的实现
Words 6735Read Time 17 min
2025-2-3
2025-2-19
type
status
date
slug
summary
tags
category
icon
password
在 Lab 4A 中,我们实现了分片控制器,由它负责分片的分配。在 Lab 4B 中,我们需要实现副本组,由它来提供对键值对的存储、读写请求以及分片迁移的功能,同样的,副本组中也要保证线性一致性。
和 Lab 4A 的实现一样,为了保证线性一致性,我们需要明确两点:
  1. 副本组中的配置更新、分片迁移、数据读写等操作,都需要先通过 Raft 达成共识后才会执行。
  1. 每个副本组都必须通过 leader 节点和外界交互,以保证简洁性和正确性。

01 | 分片迁移:ShardKV 的核心


想象一下,你正在经营一个快递公司。一开始,你在北京有一个大仓库,负责处理所有快递。随着业务增长,你发现一个仓库忙不过来了,于是在上海开了第二个仓库。现在问题来了:如何把原来北京仓库的一部分包裹转移到上海仓库?分布式系统中「分片迁移」面临的就是类似的问题。在 ShardKV 中,当我们的配置发生变化时(比如增加新的副本组,或者重新平衡负载),就需要在不同的副本组之间迁移数据分片。
这个看似简单的问题,实际上面临着三大挑战:
  • 一致性要求:迁移过程中如何保证数据信息不丢失也不错乱?
  • 可用性要求:迁移过程中还会有新的客户端请求不断到达,应该如何处理?
  • 容错性要求:网络中断、消息丢失或重复等异常情况应该如何处理?

迁移方案讨论:一致性和可用性的平衡

在分片迁移的过程中,我们很可能会遇到这么一种情况:开始时,键值对 (k1, v1) 属于分片 shard1,分配给了副本组 replica1 负责,随后,配置发生了变更,shard1 重新分配给了副本组 replica2,因此,shard1 开始从 replica1 迁移至 replica2。在迁移的过程中,客户端想要将 k1 的值修改为 v2,于是向 shardkv 发送写请求 put(k1, v2) ,如果我们不做任何措施,那么以下几种情形都有可能发生:
  1. 客户端还没有更新配置,仍然认为 k1 由 replica1 负责,将写请求发送给了 replica1。此时 replica1 还没有开始迁移,replica1 将 k1 的值改为了 v2 后才开始迁移。迁移完成后,replica2 中的 k1 的值为 v2。
  1. 客户端还没有更新配置,仍然认为 k1 由 replica1 负责,将写请求发送给了 replica1。但此时 shard1 已经发送给了 replica2,因此 replica2 中 k1 的值仍为 v1。
  1. 客户端更新了配置,知道 k1 已经由 replica2 负责,将写请求发送给了 replica2。但此时 shard1 还没有迁移至 replica2,于是 replica2 执行请求将 k1 的值保存为 v2。随后,shard1 开始迁移,replica2 中的 k1 被覆盖为了旧值 v1。
  1. 客户端更新了配置,知道 k1 已经由 replica2 负责,将写请求发送给了 replica2。此时 shard1 也已经迁移到了 replica2,于是 replica2 将 k1 的值修改为 v2。
可以看到,如果迁移和写请求可以同时处理,虽然提高了系统的可用性,但是 k1 的最终值可能仍为 v1,这是与一致性要求相违背的。对于存储服务来说,保证数据一致性应该是更为重要的,因此,在迁移分片时,我们需要施加一些限制,在优先保证一致性的前提下,尽可能的提高可用性。

方案一:迁移时,副本组拒绝所有分片读写请求

既然边迁移边处理客户端请求会导致一致性问题,那么很容易想到,可以让副本组在迁移时拒绝所有的客户端请求,直到迁移完成。如此一来,分片迁移的指令和客户端写请求的指令永远不会冲突,也就解决了一致性问题。
在实现上,我们只需要定义两种副本组状态即可:
  • serving:表示副本组正常提供服务,可以接受并执行客户端请求。
  • syncing:表示副本组正在迁移分片,若此时收到客户端请求,会拒绝该请求并返回服务器繁忙的响应。
当副本组获取到新的配置,并且本组中有分片需要被转进或转出时,就将副本组的状态修改为 syncing。在此期间只会响应分片迁移相关的指令,拒绝所有客户端请求。等到本组的分片迁移完成时,再将状态改回 serving,随后正常执行客户端请求。
虽然这个方案非常容易理解和实现,但是完全牺牲了迁移时期的可用性。每次配置被更改时,所有客户端都不得不长时间暂停手头上的任务,直到迁移完成,这在实际生产中显然是不可行的。因此,我们需要更细粒度的解决方案。

方案二:迁移时,允许读写不需要迁移的分片数据

仔细思考一下,在迁移时,副本组真的有必要拒绝所有的客户端请求吗?假设副本组 replica1 原本负责两个分片,分别是 shard1 和 shard2,处理配置更新时,需要将 shard2 迁移至副本组 replica2 中。对于 shard2,我们知道同时处理迁移指令和写指令会造成一致性问题,但是 shard1 并不需要进行迁移,对 shard1 中的键值对进行读写操作也就不会和迁移指令造成冲突。因此,我们完全可以缩小迁移时拒绝的请求范围,副本组不需要拒绝所有的客户端请求,它只需要拒绝指向被迁移分片的请求即可,而对于不需要迁移的分片数据,是完全可以一直提供读写服务的。
我们可以沿用方案一中定义的状态:serving 和 syncing。只是当副本组处于 syncing 状态时,不再拒绝所有的客户端请求,而是先判断在本次配置变更中,请求中的 key 所属的分片需不需要被迁移,如果不需要迁移,那么就执行请求,反之,拒绝请求。相比于方案一,方案二将不需要迁移的分片区分开来,使得这些分片可以不受迁移操作的影响,持续的提供读写服务,提高了可用性。
是不是可以继续优化呢?方案一和方案二都是将状态定义在副本组上,导致我们只能将一些分片当做一个整体看待,一个整体内的所有分片要么都可以读写,要么都不可以读写。比如方案一将所有分片当做一个整体,迁移时所有分片都不可以读写,方案二虽然做了区分,但所有需要迁移的分片仍被当做一个整体,只要副本组仍在 syncing 状态,这些分片就都不可以读写。而分片之间是互相独立的,我们或许可以将状态定义的对象落在一个个分片上,以实现更精细的控制。

方案三:一个分片只要完成迁移就可以立即读写,即使其他分片仍在迁移

考虑这么一种场景:配置变更时,副本组 replica1 需要从 replica2 获取分片 shard2,从 replica3 获取分片 shard3,经过一段时间的迁移后,replica1 已经成功获取了 shard2,但是由于 replica3 发生了故障,导致迟迟无法获取 shard3。在这种情况下,如果应用方案二,那么只要 shard3 没有被成功获取,replica1 就会一直保持在 syncing 状态,导致 shard2 也无法读写。但是,从一致性的角度看,对 shard2 中的数据进行读写并不会影响到 shard3 的数据,因为 shard2 和 shard3 拥有各自独立的数据空间。因此,我们可以让每个分片都拥有并维护各自的状态,以进一步提高可用性。
方案三在保证一致性的基础上,最大努力的提高了可用性,因此,我们选择方案三作为分片迁移的实现方案。在实现上,状态的定义对象不再是副本组,而是每个分片,因此,我们不再需要前两个方案中所使用的状态定义,而是需要重新设计分片状态。

分片状态定义

在分片迁移的过程中,每个分片可能面临下面三种情况:
  1. 不需要迁移,继续提供服务。
  1. 分片数据需要从其他副本组迁移到本副本组中。
  1. 分片数据需要从本副本组迁移到其他副本组中。
而在定义分片的状态时,除了需要定义对应的三种状态之外,还需要定义第四种状态:
  1. serving:表示当前副本组负责管理此分片,可以正常提供读写服务。
  1. syncPulling:表示当前副本组在最新的配置中负责管理此分片,但还需要从上一个配置中负责管理此分片的副本组中拉取数据,暂时还不可以提供读写服务。
  1. syncRetained:表示当前副本组在最新的配置中不再负责管理此分片,但需要暂时保留分片数据,等待新的负责管理此分片的副本组拉取,不可以提供读写服务。
  1. syncCleaning:表示当前副本组已经成功拉取此分片,需要通知对端副本组清理此分片的相关数据。因为当前副本组已经成功拉取分片数据了,不会再有分片迁移指令到来,因此可以正常提供读写服务。
为什么要额外定义第四种状态 syncCleaning 呢?可以看到,syncPulling 和 syncRetained 是对等的两个状态,当前副本组在拉取分片的同时,意味着对端的副本组正在保留这些分片数据,如果对端没有收到拉取成功的通知,那么会一直保存这些不再由它负责的分片,造成空间的浪费。因此我们需要 syncCleaning 状态,让当前副本组看到有分片处于这个状态时,就知道这个分片已经成功拉取了,接下来只需要通知对端副本组删除数据。
在 ShardKV 中,我们不仅需要保存分片的数据,也要维护分片的状态。

分片迁移流程

在启动服务时,我们便会开启一个分片迁移协程定时检测分片状态。这个协程每隔一段时间就会遍历所有的分片,根据分片的状态采取不同的行动。
分片的状态不是一成不变的,而是随着迁移流程的推进而不断的转换。假设在最新的配置中,分片 shard1 需要从副本组 replica2 迁移到 replica1 中,下图展示了迁移过程中的分片状态转换过程:
notion image
整个过程大体上可以分为三个阶段,分别是迁移准备阶段、分片拉取阶段和分片清理阶段。
  1. 迁移准备阶段:
    1. 开始时,replica1 中没有 shard1 的数据,replica2 中的 shard1 的数据处于 serving 状态。
    2. 两个副本组监听到最新的配置变更后,replica1 会新建空的 shard1,并将 shard1 的状态置于 syncPulling,replica2 会将 shard1 的状态转为 syncRetained。
  1. 分片拉取阶段:
    1. Replica1 的分片迁移协程在每次扫描时,如果发现 shard1 的状态为 syncPulling,就会向 replica2 发送拉取分片的请求。
    2. Replica2 收到拉取分片的请求后,确认自身 shard1 的分片状态为 syncRetained 后,将分片数据返回给 replica1。如果自身分片是其它状态,说明两个副本组当前进度并不同步,那么会拒绝这个请求。
    3. Replica1 收到回复消息后,保存分片数据,并将分片状态修改为 syncCleaning。如果分片状态已经是 syncCleaning 或 serving 了,说明这个回复消息是过期消息,那么忽略它即可。
  1. 分片清理阶段:
    1. Replica1 接着等待分片迁移协程开始下一次扫描。在下一次扫描中,分片迁移协程发现 shard1 的状态为 syncCleaning 后,向 replica2 发送清理分片的请求。
    2. Replica2 收到清理分片的请求后,如果分片状态为 syncRetained,就删除分片相关的数据,并返回清理成功消息。如果查询不到分片状态,说明分片相关的数据已经被删除,直接返回清理成功的消息即可。
    3. Replica1 收到清理成功的消息后,将分片状态修改为 serving,表示这个分片迁移完成。如果分片状态已经是 serving 了,那么忽略这个消息即可。
下面会详细介绍分片拉取阶段和分片清理阶段的实现,而迁移准备阶段属于更新配置时的动作,我们会在配置更新的章节再做介绍。

分片拉取阶段

首先看拉取分片数据的这一侧副本组(即图中的 replica1)。如果分片迁移协程在一轮遍历中,发现某个分片状态为 syncPulling,就会根据上一个配置得知这个分片保存在哪个副本组中,然后会额外开启一个子协程去并行的拉取数据。
这里有三个注意点:
  • 只有 leader 才可以发送拉取分片的请求。
  • 这里使用了 WaitGroup,目的是等待这些子协程执行完毕后再开始下一轮遍历。
  • 成功收到分片数据的响应后,不能直接更新分片数据,需要把分片数据包装到分片更新日志,交由 Raft 在所有服务器中同步,然后 apply 这个日志时执行分片更新操作。
在 apply 分片更新日志时,需要对响应的数据进行两步检查,以保证一致性:
  • 如果响应中的配置编号和自身的配置编号不同,说明这个响应是过期的数据,拒绝执行后续步骤。
  • 如果当前的分片状态不是 syncPulling,说明分片数据已经更新过了,这个响应是一个重复的响应,直接返回即可。
通过上述两步检查后,将响应中的数据保存到分片中,并将分片状态修改为 syncCleaning。
再来看被拉取分片数据的副本组(即图中的 replica2)。它在收到拉取分片的请求后,为了保证线性一致性,需要先把这个请求包装成分片获取日志交由 Raft 同步,然后 apply 这个日志时再执行具体操作。
同样的,在 apply 分片获取日志时,需要对响应的数据进行两步检查,以保证一致性:
  • 如果请求中的配置编号和自身的配置编号不同,两个副本组之间的进度并未同步,拒绝该请求。
  • 如果当前的分片状态不是 syncRetained,说明副本组还没有开始迁移流程,拒绝该请求。
通过上述两步检查后,再将分片数据保存在响应中返回。

分片清理阶段

分片清理阶段和分片拉取阶段类似,在拉取分片数据的这一侧副本组中,如果分片迁移协程在一轮遍历中,发现某个分片状态为 syncCleaning,会额外开启一个子协程去通知对端副本组删除分片相关的数据。
同样有三个类似的注意点:
  • 只有 leader 才可以发送清理分片的请求。
  • 使用 WaitGroup,等待子协程执行完毕后再开始下一轮遍历。
  • 收到清理成功的响应后,需要把响应包装成迁移完成日志,交由 Raft 去达成共识,然后 apply 这个日志时再将分片状态修改为 serving。
在 apply 迁移完成日志时,同样需要检查配置编号和分片状态,这里不再赘述。
被拉取分片数据的副本组在收到清理分片的请求后,同样需要先把这个请求包装成分片清理日志交由 Raft 同步。同样的,在 apply 分片清理日志时,需要对配置编号和分片状态进行检查。
这里删除分片数据时,是将其他分片数据转移到新的 map 中,因为 Go 中的 map 不会自动缩容,调用 delete(key) 时,它只是将对应的 key 做了删除标志,并没有真正回收键值对的空间,当然,如果 value 是一个指针的话,指针指向的内存空间是会被垃圾回收的。

02 | 配置更新:分片迁移的起点


从上一章我们知道,只有分片状态不再是 serving,分片迁移协程才会发送分片迁移的请求,那么分片状态什么时候从 serving 状态转变呢?是在配置更新的时候。
在启动服务时,我们会开启一个配置更新协程,定时从分片控制器获取新的配置。这里需要注意两个地方:
  1. 每次获取配置时,直接获取最新的配置还是按顺序获取当前副本组配置的下一个配置?
    1. 应该按顺序获取下一个配置,因为这样才能保证所有副本组的配置更新进度是同步的。举个例子,假设我们直接获取最新的配置,副本组 A 先获取到最新的配置 Config1,副本组 B 还没有获取到这个配置,此时配置又更新了,最新配置为 Config2,副本组 B 获取到最新的配置 Config2,那么 A 和 B 两者的配置版本就错位了。
  1. 获取到新配置后,不能直接更新配置,而是应该先将其封装成配置更新日志,交由 Raft 在副本组的服务器中达成共识。
在执行配置更新日志时,需要执行两个检查,只要有一个检查不通过,就直接返回,不再执行后续的更新操作:
  1. 新的配置编号应该大于当期副本组的配置编号。
  1. 当前副本组中的所有分片状态应该都是 serving,表明上一轮配置更新已经执行完毕。
在更新配置时,根据下面几种情况改变分片的状态:
  • 如果是新分配给本副本组的分片,新建分片数据存储仓库,并把它的状态改为 syncPulling。
  • 如果分片分配给了其它副本组,把它的状态改为 syncRetained。
  • 如果分片不再分配给自己,也没有分配给其它副本组,那么直接删除分片数据。

03 | 读写操作


读写操作比较简单,这里就不再赘述。记得为了保证线性一致性语义,需要注意两点:
  • 在执行读写操作之前,先通过 Raft 在副本组的所有服务器之间达成共识。
  • 客户端请求在执行前需要去重,保证请求只会执行一次。

04 | 实验结果


成功通过 500 次重复测试。
notion image
 
上一篇
论文笔记《Kafka: a Distributed Messaging System for Log Processing》
下一篇
MIT 6.5840 (6.824) Lab 4A 的实现

Comments
Loading...