背景 了解 Cache 流程 流程 在 Pika-Server 启动的时候创造 Cache,对 CacheConfig 进行初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 cache::CacheConfig cache_cfg; CacheConfigInit (cache_cfg); cache_ = new PikaCache (g_pika_conf->cache_start_pos (), g_pika_conf->cache_items_per_key ()); Status ret = cache_->Init (g_pika_conf->cache_num (), &cache_cfg); ----------------------------------------------------------------------------------------------- void PikaServer::CacheConfigInit (cache::CacheConfig &cache_cfg) { cache_cfg.maxmemory = g_pika_conf->cache_maxmemory (); cache_cfg.maxmemory_policy = g_pika_conf->cache_maxmemory_policy (); cache_cfg.maxmemory_samples = g_pika_conf->cache_maxmemory_samples (); cache_cfg.lfu_decay_time = g_pika_conf->cache_lfu_decay_time (); }
在 PikaCache 构造函数中开启 cache_load_thread 线程
1 2 3 4 5 6 7 8 9 10 11 12 PikaCache::PikaCache (int cache_start_pos, int cache_items_per_key) : cache_status_ (PIKA_CACHE_STATUS_NONE) , cache_num_ (0 ) , cache_start_pos_ (cache_start_pos) , cache_items_per_key_ (EXTEND_CACHE_SIZE (cache_items_per_key)) , cache_load_thread_ (NULL ) { pthread_rwlock_init (&rwlock_, NULL ); cache_load_thread_ = new PikaCacheLoadThread (cache_start_pos_, cache_items_per_key_); cache_load_thread_->StartThread (); }
在 cache_load_thread 线程中,做异步的 key 写到缓存中,我们以 zset 类型中的 zrange 命令举例
1 2 3 4 5 > ZRANGE scores 0 1 WITHSCORES 1) "player1" 2) "100" 3) "player2" 4) "200"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void *PikaCacheLoadThread::ThreadMain () { LOG (INFO) << "PikaCacheLoadThread::ThreadMain Start" ; while (!should_exit_) { std::deque<std::pair<const char , std::string>> load_keys; { std::lock_guard lq (loadkeys_mutex_) ; waitting_load_keys_num_ = loadkeys_queue_.size (); while (!should_exit_ && 0 >= loadkeys_queue_.size ()) { loadkeys_cond_.notify_one (); } if (should_exit_) { return nullptr ; } for (int i = 0 ; i < CACHE_LOAD_NUM_ONE_TIME; ++i) { if (!loadkeys_queue_.empty ()) { load_keys.push_back (loadkeys_queue_.front ()); loadkeys_queue_.pop_front (); } } } auto slot = cache_->GetSlot (); for (auto iter = load_keys.begin (); iter != load_keys.end (); ++iter) { if (LoadKey (iter->first, iter->second, slot)) { ++async_load_keys_num_; } else { LOG (WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << iter->second << " failed !!!" ; } std::lock_guard lq (loadkeys_map_mutex_) ; loadkeys_map_.erase (iter->second); } } return nullptr ; }
然后调用 Init 对 cache 进行初始化操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Status PikaCache::Init (uint32_t cache_num, cache::CacheConfig *cache_cfg) { slash::RWLock l (&rwlock_, true ) ; if (NULL == cache_cfg) { return Status::Corruption ("invalid arguments !!!" ); } return InitWithoutLock (cache_num, cache_cfg); } --------------------------------------------------------------------------------------------- PikaCache::InitWithoutLock (uint32_t cache_num, cache::CacheConfig *cache_cfg) { cache_status_ = PIKA_CACHE_STATUS_INIT; cache_num_ = cache_num; if (NULL != cache_cfg) { dory::RedisCache::SetConfig (cache_cfg); } for (uint32_t i = 0 ; i < cache_num; ++i) { dory::RedisCache *cache = new cache::RedisCache (); Status s = cache->Open (); if (!s.ok ()) { LOG (ERROR) << "PikaCache::InitWithoutLock Open cache failed" ; DestroyWithoutLock (); cache_status_ = PIKA_CACHE_STATUS_NONE; return Status::Corruption ("create redis cache failed" ); } caches_.push_back (cache); cache_mutexs_.push_back (new pstd::Mutex ()); } cache_status_ = PIKA_CACHE_STATUS_OK; return Status::OK (); }
在定时器任务中,新增了 ProcessCronTask
和 UpdataCacheInfo
用来定期检测 cache 命中率和更新 cache 信息
1 2 3 4 5 6 void PikaServer::DoTimingTask () { g_pika_cache_manager->ProcessCronTask (); UpdateCacheInfo (); }
在调用 slaveof
命令或者 flushdb
的时候,需要清空 cache,这种情况下我们会把 cache
的状态设置为 PIKA_CACHE_STATUS_CLEAR
, 我们会用 bg_thread 专门去做清空缓存的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void PikaServer::ClearCacheDbAsync (void ) { if (PIKA_CACHE_STATUS_OK != cache_->CacheStatus ()) { LOG (WARNING) << "can not clear cache in status: " << cache_->CacheStatus (); return ; } bgsave_thread_.StartThread (); BGCacheTaskArg *arg = new BGCacheTaskArg (); arg->p = this ; arg->task_type = CACHE_BGTASK_CLEAR; bgsave_thread_.Schedule (&DoCacheBGTask, static_cast <void *>(arg)); } ------------------------------------------------------------------- void PikaServer::DoCacheBGTask (void * arg) { BGCacheTaskArg *pCacheTaskArg = static_cast <BGCacheTaskArg*>(arg); PikaServer* p = pCacheTaskArg->p; switch (pCacheTaskArg->task_type) { case CACHE_BGTASK_CLEAR: LOG (INFO) << "clear cache start..." ; p->Cache ()->SetCacheStatus (PIKA_CACHE_STATUS_CLEAR); p->ResetDisplayCacheInfo (PIKA_CACHE_STATUS_CLEAR); p->Cache ()->FlushSlot (); LOG (INFO) << "clear cache finish" ; break ; case CACHE_BGTASK_RESET_NUM: LOG (INFO) << "reset cache num start..." ; p->Cache ()->SetCacheStatus (PIKA_CACHE_STATUS_RESET); p->ResetDisplayCacheInfo (PIKA_CACHE_STATUS_RESET); p->Cache ()->Reset (pCacheTaskArg->cache_num); LOG (INFO) << "reset cache num finish" ; break ; case CACHE_BGTASK_RESET_CFG: LOG (INFO) << "reset cache config start..." ; p->Cache ()->SetCacheStatus (PIKA_CACHE_STATUS_RESET); p->ResetDisplayCacheInfo (PIKA_CACHE_STATUS_RESET); p->Cache ()->Reset (pCacheTaskArg->cache_num); LOG (INFO) << "reset cache config finish" ; break ; default : LOG (WARNING) << "invalid cache task type: " << pCacheTaskArg->task_type; break ; } p->Cache ()->SetCacheStatus (PIKA_CACHE_STATUS_OK); if (pCacheTaskArg->reenable_cache && pCacheTaskArg->c) { pCacheTaskArg->c->UnsetCacheDisableFlag (); } delete (BGCacheTaskArg*)arg; }
当配置文件中的 cache 数量和当前的 cache 数量不一致时会触发重启 cache,也是用 bg_thread 去做专门的重启操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void PikaServer::ResetCacheAsync (uint32_t cache_num, cache::CacheConfig *cache_cfg) { if (PIKA_CACHE_STATUS_OK == cache_->CacheStatus () || PIKA_CACHE_STATUS_NONE == cache_->CacheStatus ()) { bgsave_thread_.StartThread (); BGCacheTaskArg *arg = new BGCacheTaskArg (); arg->p = this ; arg->cache_num = cache_num; if (NULL == cache_cfg) { arg->task_type = CACHE_BGTASK_RESET_NUM; } else { arg->task_type = CACHE_BGTASK_RESET_CFG; arg->cache_cfg = *cache_cfg; } bgsave_thread_.Schedule (&DoCacheBGTask, static_cast <void *>(arg)); } else { LOG (WARNING) << "can not reset cache in status: " << cache_->CacheStatus (); } }
在调用 ~PikaCache
Reset
这几个接口是会调用DestroyWithoutLock
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 else if (set_item == "cache-num" ) { if (!pstd::string2int (value.data (), value.size (), &ival) || ival < 0 ) { ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-num'\r\n" ; return ; } int cache_num = (0 >= ival || 48 < ival) ? 16 : ival; if (cache_num != g_pika_conf->cache_num ()) { g_pika_conf->SetCacheNum (cache_num); g_pika_server->ResetCacheAsync (cache_num); } ret = "+OK\r\n" ; } -------------------------------------------------------------------------------- rocksdb::Status PikaCache::Reset (uint32_t cache_num, cache::CacheConfig *cache_cfg) { std::unique_lock l (rwlock_) ; DestroyWithoutLock (); return InitWithoutLock (cache_num, cache_cfg); } ----------------------------------------------------------------------------------- void PikaCache::DestroyWithoutLock (void ) { cache_status_ = PIKA_CACHE_STATUS_DESTROY; for (auto iter = caches_.begin (); iter != caches_.end (); ++iter) { delete *iter; } caches_.clear (); for (auto iter = cache_mutexs_.begin (); iter != cache_mutexs_.end (); ++iter) { delete *iter; } cache_mutexs_.clear (); }
操作缓存条件:
该命令需要操作缓存
缓存模式不为NONE
缓存状态为OK
当前不是 slave 状态
1 2 3 4 if (need_cache_do () && g_pika_conf->cache_model () != PIKA_CACHE_NONE && slot->cache ()->CacheStatus () == PIKA_CACHE_STATUS_OK && !g_pika_server->is_slave ()) {
master端命令流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void Cmd::DoCommand (const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys) { if (!is_suspend ()) { slot->DbRWLockReader (); } if (need_cache_do () && g_pika_conf->cache_model () != PIKA_CACHE_NONE && slot->cache ()->CacheStatus () == PIKA_CACHE_STATUS_OK) { if (is_need_read_cache ()) { PreDo (slot); } if (is_read () && res ().CacheMiss ()) { DoFromCache (slot); if (is_need_update_cache ()) { DoUpdateCache (slot); } } else if (is_write ()) { DoFromCache (slot); if (is_need_update_cache ()) { DoUpdateCache (slot); } } } else { Do (slot); } if (!is_suspend ()) { slot->DbRWUnLock (); } }
slave端命令流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!c_ptr->is_suspend ()) { slot->DbRWLockReader (); } if (c_ptr->need_cache_do () && g_pika_conf->cache_model () != PIKA_CACHE_NONE && slot->cache ()->CacheStatus () == PIKA_CACHE_STATUS_OK) { if (c_ptr->is_write ()) { c_ptr->DoFromCache (slot); if (c_ptr->is_need_update_cache ()) { c_ptr->DoUpdateCache (slot); } } } else { c_ptr->Do (slot); } if (!c_ptr->is_suspend ()) { slot->DbRWUnLock (); }
当前 Pika 和 Xcache 存在的差异
Pika
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ## pika_cache.h (pika) private : std::atomic<int > cache_status_; std::unique_ptr<cache::RedisCache> cache_; int cache_start_pos_; int cache_items_per_key_; std::shared_mutex rwlock_; std::unique_ptr<PikaCacheLoadThread> cache_load_thread_; std::shared_ptr<Slot> slot_; --------------------------------------------------------------------------------------------------- ## pika_cache.h (xcache) private : std::vector<dory::RedisCache*> caches_; std::vector<slash::Mutex*> cache_mutexs_; std::atomic<int > cache_status_; uint32_t cache_num_; pthread_rwlock_t rwlock_; int cache_start_pos_; int cache_items_per_key_; PikaCacheLoadThread *cache_load_thread_; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ## Pika_server.h (xcache) slash::RWMutex cache_info_rwlock_; DisplayCacheInfo cache_info_; PikaCache *cache_; ---------------------------------------------------------------------------------------- ## Pika_cache_manager.h (pika) private : std::shared_mutex mu_; std::unordered_map<std::string, std::shared_ptr<PikaCache>> caches_; std::atomic<int > cache_status_; PikaCacheLoadThread *cache_load_thread_; ## pika_slot.h (pika) private : std::shared_ptr<PikaCache> cache_;
问题
Xcache 是只支持单 DB 的,所以用 g_pika_server 去操纵 DB,而 Pika 支持多 DB 的,得以 slot 维度去操纵 DB,那么在任何时候我们需要获取到目前的 DB 的名字才能获取到对应的 slot,通过调用 slot = g_pika_server->GetSlotByDBName(db_name_);
目前Flushall
和 Flushdb
的时候目前还不能清空缓存, 这部分和 CacheLoadThread
线程那边有关系
在定时器任务中,每次需要更新 cache
的值
Cache 有以下六种状态
PIKA_CACHE_STATUS_NONE
: 0
PIKA_CACHE_STATUS_INIT
:1
PIKA_CACHE_STATUS_OK
: 2
PIKA_CACHE_STATUS_RESET
: 3
PIKA_CACHE_STATUS_DESTROY
: 4
PIKA_CACHE_STATUS_CLEAR
: 5
Cache 的模式有两种
PIKA_CACHE_NONE
: 0
PIKA_CACHE_READ
: 1
Cache 的大小配置有以下两个参数
PIKA_CACHE_SIZE_MIN
: 512M
PIKA_CACHE_SIZE_DEFAULT
: 10G
Cache
1 2 3 cache-start-direction : 0 cache-items-per-key : 512
Cache 淘汰配置:所有键中最近最少使用
1 2 3 4 5 6 7 8 9 10 cache-maxmemory-policy : 1
1 2 3 4 5 cache-maxmemory-samples: 5 cache-lfu-decay-time: 1