背景
以 Pika 的单机模式走通了 String 类型下的 set/get 命令
流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) { if (!is_suspend()) { slot->DbRWLockReader(); } if (need_cache_do() && PIKA_CACHE_NONE != g_pika_conf->cache_model()) { && PIKA_CACHE_STATUS_OK == g_pika_server->Cache()->CacheStatus()) {
if (is_need_read_cache()) { PreDo(slot); } if (is_read() && res().CacheMiss()) { DoFromCache(slot); if (CmdStatus().ok() && is_need_update_cache()) { DoUpdateCache(slot); } } else if (is_write()) { DoFromCache(slot); if (CmdStatus().ok() && is_need_update_cache()) { DoUpdateCache(slot); } } } else { Do(slot); } if (!is_suspend()) { slot->DbRWUnLock(); } }
|
以 Set 命令为例子,我们看下它的几种命令操作
DoFromCache
: 写 RocksDB,调用 Do 接口
1 2 3
| void SetCmd::DoFromCache(std::shared_ptr<Slot> slot) { Do(slot); }
|
UpdateCache
:更新缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void SetCmd::DoUpdateCache(std::shared_ptr<Slot> slot) { switch (condition_) { case SetCmd::kXX: slot->cache()->Setxx(key_, value_, sec_); break; case SetCmd::kNX: slot->cache()->Setnx(key_, value_, sec_); break; case SetCmd::kVX: slot->cache()->Setvx(key_, target_, value_, sec_); break; case SetCmd::kEXORPX: slot->cache()->Setex(key_, value_, static_cast<int32_t>(sec_)); break; default: slot->cache()->SetWithoutTTL(key_, value_); break; } }
|
PreDo
:由于 Set 是写操作,所以没有 PreDo
读缓存命令
以 Get 命令为例子,我们看下它的几种命令操作
DoFromCache
: 写 RocksDB,调用 Do 接口
1 2 3 4 5 6 7 8 9 10 11 12
| void GetCmd::DoFromCache(std::shared_ptr<Slot> slot) { res_.clear(); s_ = slot->db()->GetWithTTL(key_, &value_,&sec_); if (s_.ok()) { res_.AppendStringLenUint64(value_.size()); res_.AppendContent(value_); } else if (s_.IsNotFound()) { res_.AppendStringLen(-1); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } }
|
UpdateCache
:更新缓存
1 2 3 4 5
| void GetCmd::DoUpdateCache(std::shared_ptr<Slot> slot) { if (s_.ok()) { slot->cache()->Set(key_, value_, sec_); } }
|
PreDo
: 读缓存操作
1 2 3 4 5 6 7 8 9
| void GetCmd::PreDo(std::shared_ptr<Slot> slot) { auto s = slot->cache()->Get(key_, &value_); if (s.ok()) { res_.AppendStringLen(value_.size()); res_.AppendContent(value_); } else { res_.SetRes(CmdRes::kCacheMiss); } }
|
总结
一共有 4 种执行命令:
Do
: Pika 原生的执行命令方法
PreDo
:读缓存操作
UpdateCache
: 更新缓存操作
DoFromCache
: 写命令下默认调用的就是 Do
接口,读命令下会先调用 res_.clear()
清空之前 PreDo
的 res_
结果,然后调用 Do
写命令:
1 2 3
| void SetCmd::CacheDo() { Do(); }
|
读命令:
1 2 3 4
| void ExistsCmd::CacheDo() { res_.clear(); Do(); }
|
对于 DoFromCache
命令,我们也发现了有一些读命令不单单是调用的 Do
接口,而是选择了直接重写
1 2 3 4 5 6 7 8 9 10
| void StrlenCmd::Do() { int32_t len = 0; s_ = g_pika_server->db()->Strlen(key_, &len); if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(len); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } return; }
|
1 2 3 4 5 6 7 8 9
| void StrlenCmd::CacheDo() { res_.clear(); s_ = g_pika_server->db()->GetWithTTL(key_, &value_, &sec_); if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(value_.size()); } else { res_.SetRes(CmdRes::kErrOther, s_.ToString()); } }
|
比如这里的 strlen
命令,原生的 Do
命令是直接调 strlen
接口,其实底层还是调 Get
接口先获取到 Key 然后计算 Key 的 size, 而 CacheDo
这边的话直接就是调用的 Get
接口了,然后计算出 value 的 size
问题
- 下面注释的代码中: 这里是否需要对key进行加锁,保证操作 rocksdb 和 cache 是原子的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) { if (!is_suspend()) { slot->DbRWLockReader(); } if (need_cache_do() && PIKA_CACHE_NONE != g_pika_conf->cache_model()) { && PIKA_CACHE_STATUS_OK == g_pika_server->Cache()->CacheStatus()) {
if (is_need_read_cache()) { PreDo(slot); } if (is_read() && res().CacheMiss()) { DoFromCache(slot); if (CmdStatus().ok() && is_need_update_cache()) { DoUpdateCache(slot); } } else if (is_write()) { DoFromCache(slot); if (CmdStatus().ok() && is_need_update_cache()) { DoUpdateCache(slot); } } } else { Do(slot); } if (!is_suspend()) { slot->DbRWUnLock(); } }
|
既然 CacheDo
调用的接口的就是 Do
,只是部分场景下会需要把 res_
清空,那么我们可不可以直接在每个 Do
接口执行之前调用 res_.clear
这样就不需要再去给每个命令写 CacheDo
方法了
代码中有逻辑冗余的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) { if (!is_suspend()) { slot->DbRWLockReader(); } if (need_cache_do() && PIKA_CACHE_NONE != g_pika_conf->cache_model()) { && PIKA_CACHE_STATUS_OK == g_pika_server->Cache()->CacheStatus()) {
if (is_need_read_cache()) { PreDo(slot); } if (is_read() && res().CacheMiss()) { DoFromCache(slot); if (CmdStatus().ok() && is_need_update_cache()) { DoUpdateCache(slot); } } else if (is_write()) { DoFromCache(slot); if (CmdStatus().ok() && is_need_update_cache()) { DoUpdateCache(slot); } } } else { Do(slot); } if (!is_suspend()) { slot->DbRWUnLock(); } }
|
1 2 3 4 5
| void GetCmd::DoUpdateCache(std::shared_ptr<Slot> slot) { if (s_.ok()) { slot->cache()->Set(key_, value_, sec_); } }
|