背景
自 Pika 3.5.0 版本开始,Pika 支持 Codis + Pika 集群的模式,实现方法是将 Codis 中的 redis-server 替换成为 pika-server,增加 slot key 的概念 (默认1024个,可以通过配置文件进行配置),以 set 类型举例,在用户对每个 key 进行修改操作时,根据 slot = crc32(key) % 1024 计算出该 key 对应的 slot,然后将该 key 加到对应的 slot 中;迁移时,指定 slot,从该 slot 中 spop 出一个 key 然后迁移到目标 pika-server
当前的 Pika 的 slot 迁移方式包括以下几种
指定范围内的 Slots 迁移到指定的 Group
指定数量的 Slots 从一个 Group 迁移到另一个 Group (从指定 Group 的头部 Slots 开始计算)
平均分配所有的 Slots,即 Rebalance All Slots
目前的问题是 slot 的迁移速度很慢,具体原因体现在 slot 迁移时是以单个 slot 为粒度迁移,这对后续 Pika 进行自动扩缩容的性能存在影响
讨论
什么场景下需要用到 slot 迁移?
- 新集群部署的时候,平均分配所有的 slot 到各个 Group 中,即 Rebalance All Slots
- 扩容一个 Group 的时候,需要将其他 Group 上面的 slot 迁移到扩容的 Group 上,如果使用 Rebalance All Slots 的话会平均把每个 Group 中末尾的 slot 平均分给新的 Group
- 新集群部署的时候,平均分配所有的 slot 到各个 Group 中,即 Rebalance All Slots
缩容一个 Group 的时候,需要将缩容的 Group 上面的 slot 上面的数据迁移到集群中其他的 Group 中,即平均迁移 slot 到各个 Group 上,最后才能下线这个 Group,目前还不能做到平均自动迁移,只能手动选择范围内的 slot 进行迁移,等把所有在缩容的 Group 上的 slot 迁移完之后,才能缩容这个 Group
迁移 slot 过程中实际用到的命令是哪些?
slotsdel: 删除 Pika 中 slot 下的全部 key-value,不会删除原生的 key
slotscleanup: 删除 slotID 对应的 key,删除 slot 里面的 key 和 原生 key
slotsmgrtslot: 随机选择 slot 下的 1 个 key-value 迁移到目标机 (迁移原生的 key 和 slot)
slotsmgrttagslot: 随机选择 slot 下的 1 个 key-value 和该 key 相同 tag 的 key-value 迁移到目标机
slotsmgrtone: 迁移指定 key 到目标机 (迁移原生的 key 和 slot)
slotsmgrttagone: 迁移指定 key 和 key 有相同的 tag 的 key 到目标机
slotsmgrtslot-async: 异步将指定 slot 里指定数量的 key 迁移到目标机器 (迁移原生的 key 和 slot)
slotsmgrttagslot-async: 异步将指定 slot 里指定数量的 key(带 tag) 迁移到目标机器
代码分析
Codis 代码
先说结论,在 Codis 中 Slot 迁移分为两种,一种是同步迁移,用的命令是 Pika 中的 SLOTSMGRTTAGSLOT
,这个命令用法如下:
1 | > slotsmgrttagslot host port timeout slot |
另一种是异步迁移,用的命令是 Pika 中的 SLOTSMGRTTAGSLOT-ASYNC
,这个命令用法如下:
1 | > slotsmgrttagslot-async hostport timeout maxbulks maxbytes slot numkeys |
我们以 Migreate Range 按钮为例 (这个命令是将一定范围内的 slot 迁移到指定的 Group),来看下它在 Codis 中的逻辑
我们会调用 SlotCreateActionRange
这个函数
1 | func (s *Topom) SlotCreateActionRange(beg, end int, gid int, must bool) error { |
这里会先检查一些状态,如果该 Slot 是否正在迁移,目标 Group 和 当前 Group 是否一致,然后将状态改为 ActionPending
,然后保存到 Etcd 中就返回给用户了,在 Dashboard 启动的时候,有个协程会随着它启动,这个入口为 Topom::ProcessSlotAction
,用来扫描这个状态进行迁移:
1 | go func() { |
其中这个 ProcessSlotAction()
代码如下:
1 | func (s *Topom) ProcessSlotAction() error { |
整体的状态变换过程如下:
ActionPending -> ActionPreparing -> ActionPrepared -> ActionMigrating -> ActionFinished
在 ActionMigrating 之前变更都只是更新 Etcd 中的状态,ActionPreparing 和 ActionPrepared 还会调用 resyncSlotMappings 通过Proxy 重连新的 Pika Server 并且设置 slot 从哪迁移等信息,我们看下实际的数据迁移是怎么发生的
1 | func (s *Topom) processSlotAction(sid int) error { |
通过 newSlotActionExecutor 得到执行器
1 | case models.ForwardSync: // 同步 |
我们先看下同步,这里调用的就是 SLOTSMGRTTAGSLOT
1 | func (c *Client) MigrateSlot(slot int, target string) (int, error) { |
再看下异步,这里调用的是SLOTSMGRTTAGSLOT-ASYNC
1 | func (c *Client) MigrateSlotAsync(slot int, target string, option *MigrateSlotAsyncOption) (int, error) { |
Pika 代码
Slot 生成
在通过对代码的阅读后,我认为这里的 slot 代表的是一个 set 类型的数据结构,在这个 set 里面存放了我们的很多数据,slot 表示的是一个抽象的概念,接下来我们以一个 set 命令来讲解一下 slot 是什么
这里我们调用 AddSlotKey
函数生成这个 slot
这里我们可以看到调用了 SAdd
命令将一个 slot 创建了,下面是我们在 slotmigrate
为 yes 的情况下,执行 set key value
命令可以看到的,图中我们执行 keys *
可以看到不仅仅有一个 key, 还有一个 _internal:slotkey:4migrate:937
, 这个 937 就是我们计算出来的这个 key 的 hash 值。我们再看下这个 slot 下存放的数据可以看到有 kkey
, 所以在 slotmigrate
打开的状态下,Pika 是存了两份数据的。
Slot 迁移
我们先看第一种
SLOTSMGRTTAGSLOT
同步迁移1
2> slotsmgrttagslot host port timeout slot
> slotsmgrttagslot 127.0.0.1 6380 100 579
我们会先调用 CleanMigrateClient()
函数把 Pika 超时之前把 migrate_clients
关闭,然后调用 SlotsMgrtTag
去迁移这个 Key,我们可以看到迁移的 Key 是通过 SSCAN 命令返回
的,下面是迁移的具体逻辑:
我们可以看到迁移分为两个部分,第一部分就是迁移到目标机器上,另一部分就是删除本地的数据,我们这里重点看迁移的那个步骤
迁移分为三步:
- GetMigrateClient 建立 Redis 连接
- MigrateSent 发送数据 (以命令的形式发送给目标机)
- MigrateRecv 收到返回值
总结一下,同步迁移整体的流程如下:
CleanMigrateClient 清除超时连接
SlotsMgrtTag
SlotsMgrtOne
MigrateKey
GetMigrateClient 获取连接
MigrateSend 发送命令
MigrateRecv 接收返回值
第二种异步迁移
SLOTSMGRTTAGSLOT-ASYNC
1
2> slotsmgrttagslot-async hostport timeout maxbulks maxbytes slot numkeys
> slotsmgrttagslot-async pika 9221 5000 200 33554432 518 500
这里的入口函数是 SlotsMigrateBatch
, 最终调用的是 ReqMigrateBatch
, 我们仔细看下这个函数:
这里的 StartThread
就是执行 PikaMigrateThread
的 ThreadMain
,这个 CreateParseSendThreads
的逻辑我们看下
可以看到 CreateParseSendThreads
函数中,又启动了新的线程 worker->StartThread()
,这里的 worker 类型是 PikaParseSendThread
类型,这里的 StartThread
的入口函数就是 PikaParseSendThread
的 ThreadMain
函数
真正的迁移操作就在这个 MigrateOneKey
中
这里面的逻辑我就不多介绍,大致的意思还是和同步的逻辑一样,建立连接然后调用 cli->Send
发送命令
总结一下在异步同步过程中,调用了 SlotsMigrateBatch
接口,然后启动了 PikaMigrateThread
去做迁移 key 的操作,在 PikaMigrateThread
中又启动了一组 PikaParseSendThread
的 worker 线程去做真正的迁移操作,通过建立连接,向目标机发送命令的形式传递 slot 中的 key