Pika代码理解(一)

在 Pika 的 src 下的 pika.cc 中的 main 函数中首先通过函数创造出 PikaServer 实例

1
2
3
# pika/src/pika.cc
g_pika_server = new PikaServer(); // 创造出g_pika_server实例 line 188
g_pika_server->Start(); // 启动pika line 195

第195行的 Start 函数我在 src 下的 pika_server.cc 中找到了这个 Start() 函数的实现, 随后这里的start_thread_pool() 这个函数是开启线程池的意思,pika_thread_pool_ 这个私有成员变量属于 pika_ server.h 里面,Threadpoll* 这个类指针在 pink/include/thread_pool.h 里面

1
2
3
4
5
6
7
8
9
10
# pika/src/pika_server.cc
void PikaServer::Start() { // line 281
int ret = 0;
ret = pika_thread_pool_->start_thread_pool();
if (ret != pink::kSuccess) {
delete logger_;
db_.reset();
LOG(FATAL) << "Start ThreadPool Error: " << ret << (ret == pink::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread(); // 这里往下的部分类似,我理解为在启动线程池之后依次开启线程(DispatchThread, Trysync等线程)

我开始查找 start_thread_poll 这个函数的原型发现在 pink/src 下 thread_poll.cc 中,这里的代码我有些看不懂,结合博客大概意思是创造出多个 worker 线程,pika 中的 worker 线程存在多个用于对用户的命令做出解析返回结果,worker 线程创造完之后下面依次开启其他线程(DispatchThread,Trysync 这些)

1
2
# pika/include/pika_server.h
pink::ThreadPool* pika_thread_pool_; // line 422
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# pink/src/thread_pool.cc
int ThreadPool::start_thread_pool() { // line 56
if (!running_.load()) {
should_stop_.store(false);
for (size_t i = 0; i < worker_num_; ++i) {
workers_.push_back(new Worker(this));
int res = workers_[i]->start();
if (res != 0) {
return kCreateThreadError;
}
}
running_.store(true);
}
return kSuccess;
}

结合我开启 pika 之后最后一段文字显示的是”Pika Server going to start”这段话,我认为这个 while(!exit) 是真正处理客户端请求的循环,会执行 DoTimingTask() 这个函数,这个函数也在 pika_server.cc下面

1
2
3
4
5
6
7
8
9
10
11
12
# pika/src/pika_server.cc
LOG(INFO) << "Pika Server going to start"; // line 352
while (!exit_) {
DoTimingTask();
// wake up every 10 second
int try_num = 0;
while (!exit_ && try_num++ < 10) {
sleep(1);
}
}
LOG(INFO) << "Goodbye...";
}

这个 DoTimingTask() 函数中会先执行 AutoCompactRange(); 这个函数在 pika_server.cc 中,但是我在这个函数中没有看到熟悉的 read 函数从客户端读取字节流,不过这个 AutoCompactRange 字面意思翻译我认为是自动进行compaction 吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# pika/src/pika_server.cc
void PikaServer::DoTimingTask() { // line 1783
// Maybe schedule compactrange
AutoCompactRange();
// Purge log
AutoPurge();
// Delete expired dump
AutoDeleteExpiredDump();

// Check rsync deamon
if (((role_ & PIKA_ROLE_SLAVE) ^ PIKA_ROLE_SLAVE) || // Not a slave
repl_state_ == PIKA_REPL_NO_CONNECT ||
repl_state_ == PIKA_REPL_CONNECTED ||
repl_state_ == PIKA_REPL_ERROR) {
slash::StopRsync(g_pika_conf->db_sync_path());
}
1
2
3
4
5
6
7
8
9
10
# pika/src/pika_server.cc 
void PikaServer::AutoCompactRange() { // line 1233
struct statfs disk_info;
int ret = statfs(g_pika_conf->db_path().c_str(), &disk_info);
if (ret == -1) {
LOG(WARNING) << "statfs error: " << strerror(errno);
return;
}
.......
}

后面我又重新看了下 pika 的文档发现了:DispatchThread:监听端口 1 个端口,接收用户连接请求这个线程是用来处理用户连接请求的,文档中(DispatchThread, Worker, TrySync)这三个线程是放在一起的用于对用户的请求做出反应,于是我开始从那几个创造线程的地方重点看这个:ret = pika_dispatch_thread_ ->StartThread();文档的顺序是 DispatchThread -> Worker ->TrySync 我理解是用户先建立连接,然后发送请求返回数据,然后服务器这边再同步数据。于是我去找 pika_dispatch_thread_->StartThread() 这个函数的实现方法,很遗憾代码我还是看不懂。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# pink/src/dispatch_thread.cc
int DispatchThread::StartThread() { // line 65
for (int i = 0; i < work_num_; i++) {
int ret = handle_->CreateWorkerSpecificData(
&(worker_thread_[i]->private_data_));
if (ret != 0) {
return ret;
}

ret = worker_thread_[i]->StartThread();
if (ret != 0) {
return ret;
}
if (!thread_name().empty()) {
worker_thread_[i]->set_thread_name("WorkerThread");
}
}
return ServerThread::StartThread();
}