Pika的slot数据迁移学习

背景

自 Pika 3.5.0 版本开始,Pika 支持 Codis + Pika 集群的模式,实现方法是将 Codis 中的 redis-server 替换成为 pika-server,增加 slot key 的概念 (默认1024个,可以通过配置文件进行配置),以 set 类型举例,在用户对每个 key 进行修改操作时,根据 slot = crc32(key) % 1024 计算出该 key 对应的 slot,然后将该 key 加到对应的 slot 中;迁移时,指定 slot,从该 slot 中 spop 出一个 key 然后迁移到目标 pika-server

当前的 Pika 的 slot 迁移方式包括以下几种

  • 指定范围内的 Slots 迁移到指定的 Group

    截屏2023-12-04 19 49 18
  • 指定数量的 Slots 从一个 Group 迁移到另一个 Group (从指定 Group 的头部 Slots 开始计算)

    截屏2023-12-04 19 49 41
  • 平均分配所有的 Slots,即 Rebalance All Slots

    截屏2023-12-04 19 52 54

目前的问题是 slot 的迁移速度很慢,具体原因体现在 slot 迁移时是以单个 slot 为粒度迁移,这对后续 Pika 进行自动扩缩容的性能存在影响

讨论

  • 什么场景下需要用到 slot 迁移?

    1. 新集群部署的时候,平均分配所有的 slot 到各个 Group 中,即 Rebalance All Slots截屏2023-12-04 19 53 20
    2. 扩容一个 Group 的时候,需要将其他 Group 上面的 slot 迁移到扩容的 Group 上,如果使用 Rebalance All Slots 的话会平均把每个 Group 中末尾的 slot 平均分给新的 Group
截屏2023-12-04 19 53 32
  1. 缩容一个 Group 的时候,需要将缩容的 Group 上面的 slot 上面的数据迁移到集群中其他的 Group 中,即平均迁移 slot 到各个 Group 上,最后才能下线这个 Group,目前还不能做到平均自动迁移,只能手动选择范围内的 slot 进行迁移,等把所有在缩容的 Group 上的 slot 迁移完之后,才能缩容这个 Group

  • 迁移 slot 过程中实际用到的命令是哪些?

    1. slotsdel: 删除 Pika 中 slot 下的全部 key-value,不会删除原生的 key

    2. slotscleanup: 删除 slotID 对应的 key,删除 slot 里面的 key 和 原生 key

    3. slotsmgrtslot: 随机选择 slot 下的 1 个 key-value 迁移到目标机 (迁移原生的 key 和 slot)

    4. slotsmgrttagslot: 随机选择 slot 下的 1 个 key-value 和该 key 相同 tag 的 key-value 迁移到目标机

    5. slotsmgrtone: 迁移指定 key 到目标机 (迁移原生的 key 和 slot)

    6. slotsmgrttagone: 迁移指定 key 和 key 有相同的 tag 的 key 到目标机

    7. slotsmgrtslot-async: 异步将指定 slot 里指定数量的 key 迁移到目标机器 (迁移原生的 key 和 slot)

    8. slotsmgrttagslot-async: 异步将指定 slot 里指定数量的 key(带 tag) 迁移到目标机器

代码分析

Codis 代码

先说结论,在 Codis 中 Slot 迁移分为两种,一种是同步迁移,用的命令是 Pika 中的 SLOTSMGRTTAGSLOT ,这个命令用法如下:

1
2
3
> slotsmgrttagslot host port timeout slot
> slotsmgrttagslot 127.0.0.1 6380 100 579 # 随机选择slot下的1个key-value以及和该key相同tag的key-value迁移到目标机
> (integer) 1 # 迁移成功

另一种是异步迁移,用的命令是 Pika 中的 SLOTSMGRTTAGSLOT-ASYNC,这个命令用法如下:

1
2
3
> slotsmgrttagslot-async hostport timeout maxbulks maxbytes slot numkeys
> slotsmgrttagslot-async pika 9221 5000 200 33554432 518 500 # 异步将指定slot里指定数量的key迁移到目标机器
> (integer) 1 # 迁移成功
截屏2023-12-04 19 53 48

我们以 Migreate Range 按钮为例 (这个命令是将一定范围内的 slot 迁移到指定的 Group),来看下它在 Codis 中的逻辑

截屏2023-12-04 19 54 07

我们会调用 SlotCreateActionRange这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *Topom) SlotCreateActionRange(beg, end int, gid int, must bool) error {
....
var pending []int
for sid := beg; sid <= end; sid++ {
m, err := ctx.getSlotMapping(sid)
if err != nil {
return err
}
if m.Action.State != models.ActionNothing {
if !must {
continue
}
return errors.Errorf("slot-[%d] action already exists", sid)
}
if m.GroupId == g.Id {
if !must {
continue
}
return errors.Errorf("slot-[%d] already in group-[%d]", sid, g.Id)
}
pending = append(pending, m.Id)
}

for _, sid := range pending {
m, err := ctx.getSlotMapping(sid)
if err != nil {
return err
}
defer s.dirtySlotsCache(m.Id)
// 更改状态
m.Action.State = models.ActionPending
m.Action.Index = ctx.maxSlotActionIndex() + 1
m.Action.TargetId = g.Id
// 更改 etcd 状态
if err := s.storeUpdateSlotMapping(m); err != nil {
return err
}
}
return nil
}

这里会先检查一些状态,如果该 Slot 是否正在迁移,目标 Group 和 当前 Group 是否一致,然后将状态改为 ActionPending,然后保存到 Etcd 中就返回给用户了,在 Dashboard 启动的时候,有个协程会随着它启动,这个入口为 Topom::ProcessSlotAction,用来扫描这个状态进行迁移:

1
2
3
4
5
6
7
8
9
10
11
go func() {
for !s.IsClosed() {
if s.IsOnline() {
if err := s.ProcessSlotAction(); err != nil {
log.WarnErrorf(err, "process slot action failed")
time.Sleep(time.Second * 5)
}
}
time.Sleep(time.Second)
}
}()

其中这个 ProcessSlotAction()代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (s *Topom) ProcessSlotAction() error {
for s.IsOnline() {
var (
marks = make(map[int]bool)
plans = make(map[int]bool)
)
var accept = func(m *models.SlotMapping) bool {
if marks[m.GroupId] || marks[m.Action.TargetId] {
return false
}
if plans[m.Id] {
return false
}
return true
}
var update = func(m *models.SlotMapping) bool {
if m.GroupId != 0 {
marks[m.GroupId] = true
}
marks[m.Action.TargetId] = true
plans[m.Id] = true
return true
}
var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)
for parallel > len(plans) { //状态转移在这里完成
_, ok, err := s.SlotActionPrepareFilter(accept, update)
if err != nil {
return err
} else if !ok {
break
}
}
if len(plans) == 0 {
return nil
}
var fut sync2.Future
for sid, _ := range plans {
fut.Add()
go func(sid int) {
log.Warnf("slot-[%d] process action", sid)
//重点,真正的数据迁移
var err = s.processSlotAction(sid)
if err != nil {
status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
s.action.progress.status.Store(status)
} else {
s.action.progress.status.Store("")
}
fut.Done(strconv.Itoa(sid), err)
}(sid)
}
for _, v := range fut.Wait() {
if v != nil {
return v.(error)
}
}
time.Sleep(time.Millisecond * 10)
}
return nil
}

整体的状态变换过程如下:

ActionPending -> ActionPreparing -> ActionPrepared -> ActionMigrating -> ActionFinished

在 ActionMigrating 之前变更都只是更新 Etcd 中的状态,ActionPreparing 和 ActionPrepared 还会调用 resyncSlotMappings 通过Proxy 重连新的 Pika Server 并且设置 slot 从哪迁移等信息,我们看下实际的数据迁移是怎么发生的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *Topom) processSlotAction(sid int) error {
var db int = 0
for s.IsOnline() {
if exec, err := s.newSlotActionExecutor(sid); err != nil {
return err
} else if exec == nil {
time.Sleep(time.Second)
} else {
n, nextdb, err := exec(db)
if err != nil {
return err
}
log.Debugf("slot-[%d] action executor %d", sid, n)

if n == 0 && nextdb == -1 {
return s.SlotActionComplete(sid)
}
status := fmt.Sprintf("[OK] Slot[%04d]@DB[%d]=%d", sid, db, n)
s.action.progress.status.Store(status)

if us := s.GetSlotActionInterval(); us != 0 {
time.Sleep(time.Microsecond * time.Duration(us))
}
db = nextdb
}
}
return nil
}

通过 newSlotActionExecutor 得到执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
case models.ForwardSync: // 同步
do = func() (int, error) {
return c.MigrateSlot(sid, dest)
}
case models.ForwardSemiAsync: // 异步
var option = &redis.MigrateSlotAsyncOption{
MaxBulks: s.config.MigrationAsyncMaxBulks,
MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
NumKeys: s.config.MigrationAsyncNumKeys,
Timeout: math2.MinDuration(time.Second*5,
s.config.MigrationTimeout.Duration()),
}
do = func() (int, error) {
return c.MigrateSlotAsync(sid, dest, option)
}
default:
log.Panicf("unknown forward method %d", int(method))
}

我们先看下同步,这里调用的就是 SLOTSMGRTTAGSLOT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Client) MigrateSlot(slot int, target string) (int, error) {
host, port, err := net.SplitHostPort(target)
if err != nil {
return 0, errors.Trace(err)
}
mseconds := int(c.Timeout / time.Millisecond)
if reply, err := c.Do("SLOTSMGRTTAGSLOT", host, port, mseconds, slot); err != nil {
return 0, errors.Trace(err)
} else {
p, err := redigo.Ints(redigo.Values(reply, nil))
if err != nil || len(p) != 2 {
return 0, errors.Errorf("invalid response = %v", reply)
}
return p[1], nil
}
}

再看下异步,这里调用的是SLOTSMGRTTAGSLOT-ASYNC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Client) MigrateSlotAsync(slot int, target string, option *MigrateSlotAsyncOption) (int, error) {
host, port, err := net.SplitHostPort(target)
if err != nil {
return 0, errors.Trace(err)
}
if reply, err := c.Do("SLOTSMGRTTAGSLOT-ASYNC", host, port, int(option.Timeout/time.Millisecond),
option.MaxBulks, option.MaxBytes, slot, option.NumKeys); err != nil {
return 0, errors.Trace(err)
} else {
p, err := redigo.Ints(redigo.Values(reply, nil))
if err != nil || len(p) != 2 {
return 0, errors.Errorf("invalid response = %v", reply)
}
return p[1], nil
}
}

Pika 代码

Slot 生成

在通过对代码的阅读后,我认为这里的 slot 代表的是一个 set 类型的数据结构,在这个 set 里面存放了我们的很多数据,slot 表示的是一个抽象的概念,接下来我们以一个 set 命令来讲解一下 slot 是什么
截屏2023-12-04 19 54 45

这里我们调用 AddSlotKey 函数生成这个 slot
截屏2023-12-04 19 55 00

这里我们可以看到调用了 SAdd 命令将一个 slot 创建了,下面是我们在 slotmigrate 为 yes 的情况下,执行 set key value 命令可以看到的,图中我们执行 keys * 可以看到不仅仅有一个 key, 还有一个 _internal:slotkey:4migrate:937 , 这个 937 就是我们计算出来的这个 key 的 hash 值。我们再看下这个 slot 下存放的数据可以看到有 kkey, 所以在 slotmigrate 打开的状态下,Pika 是存了两份数据的。

截屏2023-12-04 19 55 10

Slot 迁移

  1. 我们先看第一种 SLOTSMGRTTAGSLOT 同步迁移

    1
    2
    > slotsmgrttagslot host port timeout slot
    > slotsmgrttagslot 127.0.0.1 6380 100 579
    截屏2023-12-04 19 55 22
截屏2023-12-04 19 55 33 截屏2023-12-04 19 55 43 截屏2023-12-04 19 56 21

我们会先调用 CleanMigrateClient()函数把 Pika 超时之前把 migrate_clients 关闭,然后调用 SlotsMgrtTag 去迁移这个 Key,我们可以看到迁移的 Key 是通过 SSCAN 命令返回
截屏2023-12-04 19 56 36
的,下面是迁移的具体逻辑:

我们可以看到迁移分为两个部分,第一部分就是迁移到目标机器上,另一部分就是删除本地的数据,我们这里重点看迁移的那个步骤
截屏2023-12-04 19 57 12

迁移分为三步:

  1. GetMigrateClient 建立 Redis 连接
  2. MigrateSent 发送数据 (以命令的形式发送给目标机)
  3. MigrateRecv 收到返回值

总结一下,同步迁移整体的流程如下:

  1. CleanMigrateClient 清除超时连接

  2. SlotsMgrtTag

  3. SlotsMgrtOne

  4. MigrateKey

  5. GetMigrateClient 获取连接

  6. MigrateSend 发送命令

  7. MigrateRecv 接收返回值

  8. 第二种异步迁移 SLOTSMGRTTAGSLOT-ASYNC

    1
    2
    > slotsmgrttagslot-async hostport timeout maxbulks maxbytes slot numkeys
    > slotsmgrttagslot-async pika 9221 5000 200 33554432 518 500

    截屏2023-12-04 19 57 28
截屏2023-12-04 19 57 54 截屏2023-12-04 19 58 06

这里的入口函数是 SlotsMigrateBatch , 最终调用的是 ReqMigrateBatch, 我们仔细看下这个函数:

截屏2023-12-04 19 58 21

这里的 StartThread就是执行 PikaMigrateThreadThreadMain,这个 CreateParseSendThreads 的逻辑我们看下
截屏2023-12-04 19 58 59

可以看到 CreateParseSendThreads函数中,又启动了新的线程 worker->StartThread(),这里的 worker 类型是 PikaParseSendThread 类型,这里的 StartThread的入口函数就是 PikaParseSendThreadThreadMain函数
截屏2023-12-04 19 59 14

真正的迁移操作就在这个 MigrateOneKey

截屏2023-12-04 19 59 24

这里面的逻辑我就不多介绍,大致的意思还是和同步的逻辑一样,建立连接然后调用 cli->Send发送命令

截屏2023-12-04 19 59 41 截屏2023-12-04 20 00 03

总结一下在异步同步过程中,调用了 SlotsMigrateBatch接口,然后启动了 PikaMigrateThread去做迁移 key 的操作,在 PikaMigrateThread中又启动了一组 PikaParseSendThread的 worker 线程去做真正的迁移操作,通过建立连接,向目标机发送命令的形式传递 slot 中的 key