Pika的CacheStatus方案

背景

了解 Cache 流程

流程

在 Pika-Server 启动的时候创造 Cache,对 CacheConfig 进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
    // Create cache
cache::CacheConfig cache_cfg;
CacheConfigInit(cache_cfg);

cache_ = new PikaCache(g_pika_conf->cache_start_pos(), g_pika_conf->cache_items_per_key());
Status ret = cache_->Init(g_pika_conf->cache_num(), &cache_cfg);
-----------------------------------------------------------------------------------------------
void PikaServer::CacheConfigInit(cache::CacheConfig &cache_cfg) {
cache_cfg.maxmemory = g_pika_conf->cache_maxmemory();
cache_cfg.maxmemory_policy = g_pika_conf->cache_maxmemory_policy();
cache_cfg.maxmemory_samples = g_pika_conf->cache_maxmemory_samples();
cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time();
}

在 PikaCache 构造函数中开启 cache_load_thread 线程

1
2
3
4
5
6
7
8
9
10
11
12
PikaCache::PikaCache(int cache_start_pos, int cache_items_per_key)
: cache_status_(PIKA_CACHE_STATUS_NONE) // cache_status_ 构造时是NONE状态
, cache_num_(0)
, cache_start_pos_(cache_start_pos)
, cache_items_per_key_(EXTEND_CACHE_SIZE(cache_items_per_key))
, cache_load_thread_(NULL)
{
pthread_rwlock_init(&rwlock_, NULL);

cache_load_thread_ = new PikaCacheLoadThread(cache_start_pos_, cache_items_per_key_);
cache_load_thread_->StartThread();
}

在 cache_load_thread 线程中,做异步的 key 写到缓存中,我们以 zset 类型中的 zrange 命令举例

1
2
3
4
5
> ZRANGE scores 0 1 WITHSCORES
1) "player1"
2) "100"
3) "player2"
4) "200"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void *PikaCacheLoadThread::ThreadMain() {
LOG(INFO) << "PikaCacheLoadThread::ThreadMain Start";

while (!should_exit_) {
std::deque<std::pair<const char, std::string>> load_keys;
{
std::lock_guard lq(loadkeys_mutex_);
waitting_load_keys_num_ = loadkeys_queue_.size();
while (!should_exit_ && 0 >= loadkeys_queue_.size()) {
loadkeys_cond_.notify_one();
}

if (should_exit_) {
return nullptr;
}

for (int i = 0; i < CACHE_LOAD_NUM_ONE_TIME; ++i) {
if (!loadkeys_queue_.empty()) {
load_keys.push_back(loadkeys_queue_.front());
loadkeys_queue_.pop_front();
}
}
}
auto slot = cache_->GetSlot();
for (auto iter = load_keys.begin(); iter != load_keys.end(); ++iter) {
if (LoadKey(iter->first, iter->second, slot)) {
++async_load_keys_num_;
} else {
LOG(WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << iter->second << " failed !!!";
}

std::lock_guard lq(loadkeys_map_mutex_);
loadkeys_map_.erase(iter->second);
}
}

return nullptr;
}

然后调用 Init 对 cache 进行初始化操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Status PikaCache::Init(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
slash::RWLock l(&rwlock_, true);

if (NULL == cache_cfg) {
return Status::Corruption("invalid arguments !!!");
}
return InitWithoutLock(cache_num, cache_cfg);
}
---------------------------------------------------------------------------------------------
PikaCache::InitWithoutLock(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
cache_status_ = PIKA_CACHE_STATUS_INIT; // cache_status_是初始化状态

cache_num_ = cache_num;
if (NULL != cache_cfg) {
dory::RedisCache::SetConfig(cache_cfg);
}

for (uint32_t i = 0; i < cache_num; ++i) {
dory::RedisCache *cache = new cache::RedisCache();
Status s = cache->Open();
if (!s.ok()) {
LOG(ERROR) << "PikaCache::InitWithoutLock Open cache failed";
DestroyWithoutLock();
cache_status_ = PIKA_CACHE_STATUS_NONE; // 如果cache的Open失败就重新变为NONE状态
return Status::Corruption("create redis cache failed");
}
caches_.push_back(cache);
cache_mutexs_.push_back(new pstd::Mutex());
}
cache_status_ = PIKA_CACHE_STATUS_OK; // 成功就是OK状态

return Status::OK();
}

在定时器任务中,新增了 ProcessCronTaskUpdataCacheInfo 用来定期检测 cache 命中率和更新 cache 信息

1
2
3
4
5
6
void PikaServer::DoTimingTask() {
g_pika_cache_manager->ProcessCronTask();
// Print the queue status periodically
// cache info cron task, 1s
UpdateCacheInfo();
}

在调用 slaveof 命令或者 flushdb的时候,需要清空 cache,这种情况下我们会把 cache的状态设置为 PIKA_CACHE_STATUS_CLEAR, 我们会用 bg_thread 专门去做清空缓存的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void PikaServer::ClearCacheDbAsync(void) {
if (PIKA_CACHE_STATUS_OK != cache_->CacheStatus()) {
LOG(WARNING) << "can not clear cache in status: " << cache_->CacheStatus();
return;
}

bgsave_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->p = this;
arg->task_type = CACHE_BGTASK_CLEAR;
bgsave_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg));
}
-------------------------------------------------------------------
void PikaServer::DoCacheBGTask(void* arg) {
BGCacheTaskArg *pCacheTaskArg = static_cast<BGCacheTaskArg*>(arg);
PikaServer* p = pCacheTaskArg->p;

switch (pCacheTaskArg->task_type) {
case CACHE_BGTASK_CLEAR:
LOG(INFO) << "clear cache start..."; // 清空缓存
p->Cache()->SetCacheStatus(PIKA_CACHE_STATUS_CLEAR);
p->ResetDisplayCacheInfo(PIKA_CACHE_STATUS_CLEAR);
p->Cache()->FlushSlot();
LOG(INFO) << "clear cache finish";
break;
case CACHE_BGTASK_RESET_NUM:
LOG(INFO) << "reset cache num start...";
p->Cache()->SetCacheStatus(PIKA_CACHE_STATUS_RESET);
p->ResetDisplayCacheInfo(PIKA_CACHE_STATUS_RESET);
p->Cache()->Reset(pCacheTaskArg->cache_num);
LOG(INFO) << "reset cache num finish";
break;
case CACHE_BGTASK_RESET_CFG:
LOG(INFO) << "reset cache config start...";
p->Cache()->SetCacheStatus(PIKA_CACHE_STATUS_RESET);
p->ResetDisplayCacheInfo(PIKA_CACHE_STATUS_RESET);
p->Cache()->Reset(pCacheTaskArg->cache_num);
LOG(INFO) << "reset cache config finish";
break;
default:
LOG(WARNING) << "invalid cache task type: " << pCacheTaskArg->task_type;
break;
}
p->Cache()->SetCacheStatus(PIKA_CACHE_STATUS_OK);
if (pCacheTaskArg->reenable_cache && pCacheTaskArg->c) {
pCacheTaskArg->c->UnsetCacheDisableFlag();
}

delete (BGCacheTaskArg*)arg;
}

当配置文件中的 cache 数量和当前的 cache 数量不一致时会触发重启 cache,也是用 bg_thread 去做专门的重启操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void PikaServer::ResetCacheAsync(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
if (PIKA_CACHE_STATUS_OK == cache_->CacheStatus()
|| PIKA_CACHE_STATUS_NONE == cache_->CacheStatus()) {

bgsave_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->p = this;
arg->cache_num = cache_num;
if (NULL == cache_cfg) {
arg->task_type = CACHE_BGTASK_RESET_NUM;
} else {
arg->task_type = CACHE_BGTASK_RESET_CFG;
arg->cache_cfg = *cache_cfg;
}
bgsave_thread_.Schedule(&DoCacheBGTask, static_cast<void*>(arg));
} else {
LOG(WARNING) << "can not reset cache in status: " << cache_->CacheStatus();
}
}


在调用 ~PikaCache Reset 这几个接口是会调用DestroyWithoutLock函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 else if (set_item == "cache-num") {
if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) {
ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-num'\r\n";
return;
}

int cache_num = (0 >= ival || 48 < ival) ? 16 : ival;
if (cache_num != g_pika_conf->cache_num()) {
g_pika_conf->SetCacheNum(cache_num);
g_pika_server->ResetCacheAsync(cache_num); // 触发cache重启
}
ret = "+OK\r\n";
}

--------------------------------------------------------------------------------
rocksdb::Status PikaCache::Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg) {
std::unique_lock l(rwlock_);
DestroyWithoutLock();
return InitWithoutLock(cache_num, cache_cfg);
}
-----------------------------------------------------------------------------------
void PikaCache::DestroyWithoutLock(void) {
cache_status_ = PIKA_CACHE_STATUS_DESTROY; // cache变为Destroy毁灭状态

for (auto iter = caches_.begin(); iter != caches_.end(); ++iter) {
delete *iter;
}
caches_.clear();

for (auto iter = cache_mutexs_.begin(); iter != cache_mutexs_.end(); ++iter) {
delete *iter;
}
cache_mutexs_.clear();
}

操作缓存条件:

  • 该命令需要操作缓存
  • 缓存模式不为NONE
  • 缓存状态为OK
  • 当前不是 slave 状态
1
2
3
4
if (need_cache_do()
&& g_pika_conf->cache_model() != PIKA_CACHE_NONE
&& slot->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK
&& !g_pika_server->is_slave()) { // 这一层逻辑我们进行了删除

master端命令流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) {
if (!is_suspend()) {
slot->DbRWLockReader();
}
if (need_cache_do()
&& g_pika_conf->cache_model() != PIKA_CACHE_NONE
&& slot->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (is_need_read_cache()) {
PreDo(slot);
}
if (is_read() && res().CacheMiss()) {
DoFromCache(slot);
if (is_need_update_cache()) {
DoUpdateCache(slot);
}
} else if (is_write()) {
DoFromCache(slot);
if (is_need_update_cache()) {
DoUpdateCache(slot);
}
}
} else {
Do(slot);
}
if (!is_suspend()) {
slot->DbRWUnLock();
}
}

slave端命令流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById(db_name, slot_id);
// Add read lock for no suspend command
if (!c_ptr->is_suspend()) {
slot->DbRWLockReader();
}
if (c_ptr->need_cache_do()
&& g_pika_conf->cache_model() != PIKA_CACHE_NONE
&& slot->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (c_ptr->is_write()) {
c_ptr->DoFromCache(slot);
if (c_ptr->is_need_update_cache()) {
c_ptr->DoUpdateCache(slot);
}
}
} else {
c_ptr->Do(slot);
}
if (!c_ptr->is_suspend()) {
slot->DbRWUnLock();
}

当前 Pika 和 Xcache 存在的差异

Pika

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## pika_cache.h (pika)
private:
std::atomic<int> cache_status_;
std::unique_ptr<cache::RedisCache> cache_;

// currently only take effects to zset
int cache_start_pos_;
int cache_items_per_key_;
std::shared_mutex rwlock_;
std::unique_ptr<PikaCacheLoadThread> cache_load_thread_;
std::shared_ptr<Slot> slot_;
---------------------------------------------------------------------------------------------------
## pika_cache.h (xcache)
private:
std::vector<dory::RedisCache*> caches_;
std::vector<slash::Mutex*> cache_mutexs_;
std::atomic<int> cache_status_;
uint32_t cache_num_;
pthread_rwlock_t rwlock_;

// currently only take effects to zset
int cache_start_pos_;
int cache_items_per_key_;
PikaCacheLoadThread *cache_load_thread_;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
## Pika_server.h (xcache)
slash::RWMutex cache_info_rwlock_;
DisplayCacheInfo cache_info_;
PikaCache *cache_;

----------------------------------------------------------------------------------------
## Pika_cache_manager.h (pika)
private:
std::shared_mutex mu_;
std::unordered_map<std::string, std::shared_ptr<PikaCache>> caches_;
std::atomic<int> cache_status_;
PikaCacheLoadThread *cache_load_thread_;

## pika_slot.h (pika)
private:
std::shared_ptr<PikaCache> cache_;

问题

  1. Xcache 是只支持单 DB 的,所以用 g_pika_server 去操纵 DB,而 Pika 支持多 DB 的,得以 slot 维度去操纵 DB,那么在任何时候我们需要获取到目前的 DB 的名字才能获取到对应的 slot,通过调用 slot = g_pika_server->GetSlotByDBName(db_name_);

  2. 目前FlushallFlushdb 的时候目前还不能清空缓存, 这部分和 CacheLoadThread线程那边有关系

  3. 在定时器任务中,每次需要更新 cache的值

Cache 有以下六种状态

  1. PIKA_CACHE_STATUS_NONE: 0

  2. PIKA_CACHE_STATUS_INIT :1

  3. PIKA_CACHE_STATUS_OK: 2

  4. PIKA_CACHE_STATUS_RESET: 3

  5. PIKA_CACHE_STATUS_DESTROY : 4

  6. PIKA_CACHE_STATUS_CLEAR: 5

Cache 的模式有两种

  1. PIKA_CACHE_NONE : 0

  2. PIKA_CACHE_READ : 1

Cache 的大小配置有以下两个参数

  1. PIKA_CACHE_SIZE_MIN : 512M

  2. PIKA_CACHE_SIZE_DEFAULT : 10G

Cache

1
2
3
# cache-start-direction 0:cache first ${cache-items-per-key} items; -1:cache last ${cache-items-per-key} items
cache-start-direction : 0
cache-items-per-key : 512

Cache 淘汰配置:所有键中最近最少使用

1
2
3
4
5
6
7
8
9
10
# cache-maxmemory-policy
# 0: volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# 1: allkeys-lru -> Evict any key using approximated LRU.
# 2: volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# 3: allkeys-lfu -> Evict any key using approximated LFU.
# 4: volatile-random -> Remove a random key among the ones with an expire set.
# 5: allkeys-random -> Remove a random key, any key.
# 6: volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# 7: noeviction -> Don't evict anything, just return an error on write operations.
cache-maxmemory-policy : 1
1
2
3
4
5
# cache-maxmemory-samples
cache-maxmemory-samples: 5

# cache-lfu-decay-time
cache-lfu-decay-time: 1