Pika快慢命令分离方案

背景

Pika 目前的线程模型在处理命令的逻辑下只有一个线程池去处理,所有的命令装载在一个 Task 队列中,由线程池中的 worker 线程去处理请求,这样的实现方式导致在请求量比较大的情况下,一些慢命令(mset,mget) 等阻塞了快命令的执行,导致许多慢查询日志的产生,需要支持快命令和慢命令队列的分离,提升 Pika 的性能

设计方案

我们从 Pika 层面和 Codis 层面都做了快慢命令的分离,在 Pika 层面,我们新加了一个 slow-cmd线程池专门处理慢命令,在 Codis 层面,我们,接下来我们以 Pika 和 Codis 两层跟大家介绍一下

Pika

pika.conf 中新增两个配置,一个是 slow-cmd-thread-pool-size这个是慢命令线程池大小,slow-cmd-list这个是慢命令列表,我们新增了一个线程池 pika_slow_cmd_thread_pool,继承 net::ThreadPool,用来处理慢命令.

1
2
slow-cmd-thread-pool-size: 4 
slow-cmd-list: mget, mset

pika/src/pika_server.cc 下,当 Pika 启动的时候,启动慢命令线程池

1
2
3
4
5
6
7
8
9
std::unique_ptr<net::ThreadPool> pika_slow_cmd_thread_pool_;
pika_slow_cmd_thread_pool_ = std::make_unique<net::ThreadPool>(g_pika_conf->slow_cmd_thread_pool_size(), 100000);

ret = pika_slow_cmd_thread_pool_->start_thread_pool(); // 启动慢命令线程池
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}

快慢命令的主要逻辑在 pika/src/pika_client_conn.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) {
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg(); // 每个BgTaskArg里面都是一个命令
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
/**
* If using the pipeline method to transmit batch commands to Pika, it is unable to
* correctly distinguish between fast and slow commands.
* However, if using the pipeline method for Codis, it can correctly distinguish between
* fast and slow commands, but it cannot guarantee sequential execution.
*/
std::string opt = argvs[0][0]; // 取出命令
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); // 判断是不是慢命令
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd); // 进行快慢命令分离
return;
}
BatchExecRedisCmd(argvs);
}
1
2
3
4
5
6
7
void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) {
if (is_slow_cmd) {
pika_slow_cmd_thread_pool_->Schedule(func, arg); // 慢命令使用慢命令线程池
return;
}
pika_client_processor_->SchedulePool(func, arg); // 快命令使用快命令线程池
}

Proxy

codis/config/proxy.toml 修改了 Proxy 的配置文件

1
2
3
4
5
# Set backend parallel connections per server
backend_primary_parallel = 2
backend_primary_quick = 1
backend_replica_parallel = 2
backend_replica_quick = 1

codis/pkg/proxy/backend.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (s *sharedBackendConn) BackendConn(database int32, seed uint, isQuick bool, must bool) *BackendConn {
if s == nil {
return nil
}

if s.single != nil { // 这种情况后端只有一个连接,不区分快慢连接
bc := s.single[database]
if must || bc.IsConnected() {
return bc
}
return nil
}

var parallel = s.conns[database]
var i = seed

if quick := s.owner.quick; quick > 0 {
if isQuick { // 如果是快请求则变量parallel中前面的quick个连接,
i = seed % uint(quick)
if bc := parallel[i]; bc.IsConnected() {
//log.Debugf("BackendConn: find quick bc[%d]", i)
return bc
}
} else { // 如果是慢请求则遍历后面parallel中后面 len(parallel) - quick 个连接
i = seed % uint(len(parallel) - quick) + uint(quick)
if bc := parallel[i]; bc.IsConnected() {
return bc
}
}
} else {
for range parallel {
i = (i + 1) % uint(len(parallel))
if bc := parallel[i]; bc.IsConnected() {
return bc
}
}
}

if !must {
return nil
}
return parallel[0]
}

如何区分快慢命令?

每一个命令都有一个 Flag 成员,代表了一个命令的特性,可以增加一位作为快慢命令的标识。并且提供接口返回一个 bool 类型表示是否为快慢命令。另外还需更改 proxy 读取配置文件时要修改命令的 Flag

1
2
3
4
5
6
7
8
9
type OpInfo struct {
Name string
Flag OpFlag
FlagMasterOnly
FlagMayWrite
FlagNotAllow
FlagQuick
FlagSlow
)

codis/pkg/proxy/router.go

1
2
3
4
5
6
7
8
9
// SetPrimaryQuickConn Set the number of quick connections.
func (s *Router) SetPrimaryQuickConn(quick int) {
s.pool.primary.SetQuickConn(quick)
}

// SetReplicaQuickConn Set the number of quick connections.
func (s *Router) SetReplicaQuickConn(quick int) {
s.pool.replica.SetQuickConn(quick)
}

codis/pkg/proxy/proxy.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  case "quick_cmd_list":
err := setQuickCmdList(value)
if err != nil {
log.Warnf("setQuickCmdList config[%s] failed, recover old config[%s].", value, p.config.QuickCmdList)
setQuickCmdList(p.config.QuickCmdList)
return redis.NewErrorf("err:%s.", err)
}
p.config.QuickCmdList = value
return redis.NewString([]byte("OK"))
case "slow_cmd_list":
err := setSlowCmdList(value)
if err != nil {
log.Warnf("setSlowCmdList config[%s] failed, recover old config[%s].", value, p.config.SlowCmdList)
setSlowCmdList(p.config.SlowCmdList)
return redis.NewErrorf("err:%s.", err)
}
p.config.SlowCmdList = value
return redis.NewString([]byte("OK"))
case "backend_replica_quick":
n, err := strconv.Atoi(value)
if err != nil {
return redis.NewErrorf("err:%s.", err)
}

if n < 0 || n >= p.config.BackendReplicaParallel {
return redis.NewErrorf("invalid backend_replica_quick")
} else {
p.config.BackendReplicaQuick = n
p.router.SetReplicaQuickConn(p.config.BackendReplicaQuick)
return redis.NewString([]byte("OK"))
}
case "backend_primary_quick":
n, err := strconv.Atoi(value)
if err != nil {
return redis.NewErrorf("err:%s.", err)
}

if n < 0 || n >= p.config.BackendPrimaryParallel {
return redis.NewErrorf("invalid backend_primary_quick")
} else {
p.config.BackendPrimaryQuick = n
p.router.SetPrimaryQuickConn(p.config.BackendPrimaryQuick)
return redis.NewString([]byte("OK"))
}