Pika代码理解(二)

紧接着对 Pika 代码理解(一)的介绍,我在对服务器与客户端的建连这边没弄懂,之前以为建连的开启如下面这段代码,原因在于我启动 Pika 服务器时显示的最后一段话是 “Pika Server going to start”,所以根据我之前 KV 项目的理解认为下面的 while 循环便是 CS 建连的开始

1
2
3
4
5
6
7
8
9
10
11
12
# pika/src/pika_server.cc
LOG(INFO) << "Pika Server going to start"; // line 352
while (!exit_) {
DoTimingTask();
// wake up every 10 second
int try_num = 0;
while (!exit_ && try_num++ < 10) {
sleep(1);
}
}
LOG(INFO) << "Goodbye...";
}

但是当我去查询 DoTimingTask() 这个函数的时候发现这里的三个 Auto 对应的数据库的 Compact,Purge 这些操作,而且根据上文中的“wake up every 10 second”这句话我认为 CS 的建连一定不是在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# pika/src/pika_server.cc
void PikaServer::DoTimingTask() { // line 1783
// Maybe schedule compactrange
AutoCompactRange();
// Purge log
AutoPurge();
// Delete expired dump
AutoDeleteExpiredDump();

// Check rsync deamon
if (((role_ & PIKA_ROLE_SLAVE) ^ PIKA_ROLE_SLAVE) || // Not a slave
repl_state_ == PIKA_REPL_NO_CONNECT ||
repl_state_ == PIKA_REPL_CONNECTED ||
repl_state_ == PIKA_REPL_ERROR) {
slash::StopRsync(g_pika_conf->db_sync_path());
}
}

于是我整理了 Pika 的整体架构,因为是多线程模式,首先一个 PikaServer 为主线程,全局唯一一个,在PikaServer 对象中创建 ThreadPool,WorkerTheard 等等线程,这里我重点看了 pika_thread_pool_->start_thread_pool() 这个函数

1
2
3
4
5
6
7
8
9
10
# pika/src/pika_server.cc
void PikaServer::Start() { // line 281
int ret = 0;
ret = pika_thread_pool_->start_thread_pool();
if (ret != pink::kSuccess) {
delete logger_;
db_.reset();
LOG(FATAL) << "Start ThreadPool Error: " << ret << (ret == pink::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread(); // 这里往下的部分类似,我理解为在启动线程池之后依次开启线程(DispatchThread, Trysync等线程)

start_thread_pool() 这个函数在 pink 下,这里的 workers_ 是个 std::vector< Worker* > workers_ 这种类型,这里我重点看了下 workers_[i]->start() 这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# pink/src/thread_pool.cc
int ThreadPool::start_thread_pool() { // line 56
if (!running_.load()) {
should_stop_.store(false);
for (size_t i = 0; i < worker_num_; ++i) {
workers_.push_back(new Worker(this));
int res = workers_[i]->start();
if (res != 0) {
return kCreateThreadError;
}
}
running_.store(true);
}
return kSuccess;
}

这个 start() 函数具体实现如下,这里我终于看到了熟悉的 pthread_create,这个是创建多线程的方法,这里的&WorkerMain 对应的就是这个线程处理的具体的事件,于是我去看了 WorkerMain 的函数实现(不得不说 grep -nr 这命令真好用,但是我用的 control + ]每次都查不到函数的具体实现…)

1
2
3
4
5
6
7
8
9
10
11
# pink/src/thread_pool.cc
int ThreadPool::Worker::start() { // line 18
if (!start_.load()) {
if (pthread_create(&thread_id_, NULL, &WorkerMain, thread_pool_)) {
return -1;
} else {
start_.store(true);
}
}
return 0;
}

还是没看到熟悉的 read 函数,继续查吧,这个 tp->runInThread() 函数

1
2
3
4
5
6
# pink/src/thread_pool.cc
void* ThreadPool::Worker::WorkerMain(void* arg) { // line 13
ThreadPool* tp = static_cast<ThreadPool*>(arg);
tp->runInThread();
return nullptr;
}

这里我又有点懵了,看不懂这些代码,不过我能知道这里面有线程的加锁解锁操作,还有一个发现是 queue_ 和 time_queue_ ,这里的 queue_ 我感觉是存储对应的 Task 的队列,代码中有从队列取出 Task 的操作,但是我没看到什么时候这个 queue 进行装载过事件,文档中对 WorkThread 的介绍是接受用户命令,封装成 Task 扔到ThreadPool 执行,线程将返回 Reply 返回给用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# pink/src/thread_pool.cc
void ThreadPool::runInThread() { // line 147
while (!should_stop()) {
mu_.Lock();
while (queue_.empty() && time_queue_.empty() && !should_stop()) {
rsignal_.Wait();
}
if (should_stop()) {
mu_.Unlock();
break;
}
if (!time_queue_.empty()) {
struct timeval now;
gettimeofday(&now, NULL);

TimeTask time_task = time_queue_.top();
uint64_t unow = now.tv_sec * 1000000 + now.tv_usec;
if (unow / 1000 >= time_task.exec_time / 1000) {
TaskFunc func = time_task.func;
void* arg = time_task.arg;
time_queue_.pop();
mu_.Unlock();
(*func)(arg);
continue;
} else if (queue_.empty() && !should_stop()) {
rsignal_.TimedWait(
static_cast<uint32_t>((time_task.exec_time - unow) / 1000));
mu_.Unlock();
continue;
}
}
if (!queue_.empty()) {
TaskFunc func = queue_.front().func;
void* arg = queue_.front().arg;
queue_.pop();
wsignal_.SignalAll();
mu_.Unlock();
(*func)(arg);
}
}
}

既然没找到 queue 的装载,我就用 grep -nr “queue_.push”对这个进行的查询来看什么时候装载的事件

1
2
3
4
5
6
7
8
9
10
11
12
# pink/src/thread_pool.cc
void ThreadPool::Schedule(TaskFunc func, void* arg) { // line 99
mu_.Lock();
while (queue_.size() >= max_queue_size_ && !should_stop()) {
wsignal_.Wait();
}
if (!should_stop()) {
queue_.push(Task(func, arg));
rsignal_.SignalAll();
}
mu_.Unlock();
}

于是我开始反向的查询,查询了这个 Schedule 在哪里调用的,但是我没有找到调用这个的对象与前面我查的对象有哪些相似性,于是我又重新去看了 DispathchThread 这个线程, 我去查看了 pika_dispatch_thread_->StartThread() 这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# pika/src/pika_server.cc
void PikaServer::Start() { // line 281
int ret = 0;
ret = pika_thread_pool_->start_thread_pool();
if (ret != pink::kSuccess) {
delete logger_;
db_.reset();
LOG(FATAL) << "Start ThreadPool Error: " << ret << (ret == pink::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread();
if (ret != pink::kSuccess) {
delete logger_;
db_.reset();
LOG(FATAL) << "Start Dispatch Error: " << ret << (ret == pink::kBindError ? ": bind port " + std::to_string(port_) + " conflict"
: ": other error") << ", Listen on this port to handle the connected redis client";
}

我重点看了 CreateWorkerSpecificData 这个函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# pink/src/dispatch_thread.cc
int DispatchThread::StartThread() { // line 65
for (int i = 0; i < work_num_; i++) {
int ret = handle_->CreateWorkerSpecificData(
&(worker_thread_[i]->private_data_));
if (ret != 0) {
return ret;
}

ret = worker_thread_[i]->StartThread();
if (ret != 0) {
return ret;
}
if (!thread_name().empty()) {
worker_thread_[i]->set_thread_name("WorkerThread");
}
}
return ServerThread::StartThread();
}

这个函数实现如下,感觉也没有写什么东西….

1
2
3
4
5
#  pink/src/server_thread.cc
virtual int CreateWorkerSpecificData(void** data) const override { // line 46
UNUSED(data);
return 0;
}

总结

我认为 CS 的建连一定是在 pika_thread_pool_ -> start_thread_pool() 和 pika_dispatch_thread_ -> StartThread() 这两个函数中间,Pika 整体架构是首先创建一个唯一的 PikaServer 对象(g_pika_server),然后这个对象下创造线程池(ThreadPoll), Dispatch_thread 等等线程,然后系统每隔 10 秒,系统进行自动的 Compaction.