Pika的Cache方案

背景

以 Pika 的单机模式走通了 String 类型下的 set/get 命令

流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) {
if (!is_suspend()) {
slot->DbRWLockReader();
}
if (need_cache_do() // 判断该命令需要操作缓存
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()) { // 判断缓存模式不为NONE
&& PIKA_CACHE_STATUS_OK == g_pika_server->Cache()->CacheStatus()) { // 判断缓存状态为OK

if (is_need_read_cache()) { // 如果是读命令,执行读缓存
PreDo(slot); // 读缓存
}
if (is_read() && res().CacheMiss()) { // 如果是读命令,并且缓存未命中,执行读DB
DoFromCache(slot); // 读RocksDB操作
if (CmdStatus().ok() && is_need_update_cache()) { // 如果读RocksDB成功并且是需要更新缓存
DoUpdateCache(slot); // 更新缓存
}
} else if (is_write()) { // 如果是写命令
DoFromCache(slot); // 写RocksDB
if (CmdStatus().ok() && is_need_update_cache()) { // 如果写RocksDB成功并且需要更新缓存
DoUpdateCache(slot); // 更新缓存
}
}
} else {
Do(slot); // 普通的Do流程
}
if (!is_suspend()) {
slot->DbRWUnLock();
}
}
以 Set 命令为例子,我们看下它的几种命令操作

DoFromCache: 写 RocksDB,调用 Do 接口

1
2
3
void SetCmd::DoFromCache(std::shared_ptr<Slot> slot) {
Do(slot);
}

UpdateCache:更新缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void SetCmd::DoUpdateCache(std::shared_ptr<Slot> slot) {
switch (condition_) {
case SetCmd::kXX:
slot->cache()->Setxx(key_, value_, sec_);
break;
case SetCmd::kNX:
slot->cache()->Setnx(key_, value_, sec_);
break;
case SetCmd::kVX:
slot->cache()->Setvx(key_, target_, value_, sec_);
break;
case SetCmd::kEXORPX:
slot->cache()->Setex(key_, value_, static_cast<int32_t>(sec_));
break;
default:
slot->cache()->SetWithoutTTL(key_, value_);
break;
}
}

PreDo:由于 Set 是写操作,所以没有 PreDo读缓存命令

以 Get 命令为例子,我们看下它的几种命令操作

DoFromCache: 写 RocksDB,调用 Do 接口

1
2
3
4
5
6
7
8
9
10
11
12
void GetCmd::DoFromCache(std::shared_ptr<Slot> slot) {
res_.clear();
s_ = slot->db()->GetWithTTL(key_, &value_,&sec_);
if (s_.ok()) {
res_.AppendStringLenUint64(value_.size());
res_.AppendContent(value_);
} else if (s_.IsNotFound()) {
res_.AppendStringLen(-1);
} else {
res_.SetRes(CmdRes::kErrOther, s_.ToString());
}
}

UpdateCache:更新缓存

1
2
3
4
5
void GetCmd::DoUpdateCache(std::shared_ptr<Slot> slot) {
if (s_.ok()) { // 如果写RocksDB成功执行更新缓存
slot->cache()->Set(key_, value_, sec_);
}
}

PreDo: 读缓存操作

1
2
3
4
5
6
7
8
9
void GetCmd::PreDo(std::shared_ptr<Slot> slot) {
auto s = slot->cache()->Get(key_, &value_);
if (s.ok()) {
res_.AppendStringLen(value_.size());
res_.AppendContent(value_);
} else {
res_.SetRes(CmdRes::kCacheMiss);
}
}

总结

一共有 4 种执行命令:

Do: Pika 原生的执行命令方法

PreDo:读缓存操作

UpdateCache: 更新缓存操作

DoFromCache : 写命令下默认调用的就是 Do 接口,读命令下会先调用 res_.clear()清空之前 PreDores_结果,然后调用 Do

写命令:

1
2
3
void SetCmd::CacheDo() {
Do();
}

读命令:

1
2
3
4
void ExistsCmd::CacheDo() {
res_.clear();
Do();
}

对于 DoFromCache命令,我们也发现了有一些读命令不单单是调用的 Do接口,而是选择了直接重写

1
2
3
4
5
6
7
8
9
10
void StrlenCmd::Do() {
int32_t len = 0;
s_ = g_pika_server->db()->Strlen(key_, &len);
if (s_.ok() || s_.IsNotFound()) {
res_.AppendInteger(len);
} else {
res_.SetRes(CmdRes::kErrOther, s_.ToString());
}
return;
}
1
2
3
4
5
6
7
8
9
void StrlenCmd::CacheDo() {
res_.clear();
s_ = g_pika_server->db()->GetWithTTL(key_, &value_, &sec_);
if (s_.ok() || s_.IsNotFound()) {
res_.AppendInteger(value_.size());
} else {
res_.SetRes(CmdRes::kErrOther, s_.ToString());
}
}

比如这里的 strlen命令,原生的 Do 命令是直接调 strlen接口,其实底层还是调 Get 接口先获取到 Key 然后计算 Key 的 size, 而 CacheDo 这边的话直接就是调用的 Get接口了,然后计算出 value 的 size

问题

  1. 下面注释的代码中: 这里是否需要对key进行加锁,保证操作 rocksdb 和 cache 是原子的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) {
if (!is_suspend()) {
slot->DbRWLockReader();
}
if (need_cache_do()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()) {
&& PIKA_CACHE_STATUS_OK == g_pika_server->Cache()->CacheStatus()) {

if (is_need_read_cache()) {
PreDo(slot);
}
if (is_read() && res().CacheMiss()) {
// slash::ScopeRecordLock l(g_pika_server->LockMgr(), argv[1]);
DoFromCache(slot);
if (CmdStatus().ok() && is_need_update_cache()) {
DoUpdateCache(slot);
}
} else if (is_write()) {
DoFromCache(slot);
if (CmdStatus().ok() && is_need_update_cache()) {
DoUpdateCache(slot);
}
}
} else {
Do(slot);
}
if (!is_suspend()) {
slot->DbRWUnLock();
}
}
  1. 既然 CacheDo调用的接口的就是 Do ,只是部分场景下会需要把 res_清空,那么我们可不可以直接在每个 Do 接口执行之前调用 res_.clear 这样就不需要再去给每个命令写 CacheDo方法了

  2. 代码中有逻辑冗余的地方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) {
    if (!is_suspend()) {
    slot->DbRWLockReader();
    }
    if (need_cache_do()
    && PIKA_CACHE_NONE != g_pika_conf->cache_model()) {
    && PIKA_CACHE_STATUS_OK == g_pika_server->Cache()->CacheStatus()) {

    if (is_need_read_cache()) {
    PreDo(slot);
    }
    if (is_read() && res().CacheMiss()) {
    DoFromCache(slot);
    if (CmdStatus().ok() && is_need_update_cache()) { // 这里的CmdStatus().ok() 返回是s_的值,代表写DB是否成功
    DoUpdateCache(slot);
    }
    } else if (is_write()) {
    DoFromCache(slot);
    if (CmdStatus().ok() && is_need_update_cache()) {
    DoUpdateCache(slot);
    }
    }
    } else {
    Do(slot);
    }
    if (!is_suspend()) {
    slot->DbRWUnLock();
    }
    }
    1
    2
    3
    4
    5
    void GetCmd::DoUpdateCache(std::shared_ptr<Slot> slot) {
    if (s_.ok()) { // 这里又重新判断了一次s_的返回值
    slot->cache()->Set(key_, value_, sec_);
    }
    }