Pika的增量同步源码剖析

背景

本篇介绍一下 Pika 的增量同步,Pika 的增量同步依赖 Binlog 机制,我将分以下五个部分来讲解:

  • Master 接收 BinlogSyncRequest

  • Slave 端生产和消费 Binlog

  • 单机 Binlog 生产

  • 辅助线程驱动 BinlogSync

  • Binlog 的过期策略

Master 接收 BinlogSyncRequest

接着上篇的全量同步的文章继续说,上面我们说到从节点在收到主节点的 TrySync 回包之后马上给主节点发送了第一个 kBinlogSync请求,从这里开始就是开始进行增量同步了

src/pika_repl_server_conn.cc

这里由于从节点设置了一个 is_first_send ,这个时候从节点传过来的 ack_range_startack_range_end 都是一样的,BinlogOffset里面的 offset都是从节点保存的主节点当时 dump 时候的偏移量, 这里调用 ActivateSlaveBinlogSync , 这里传了一个 LogOffset 类型的入参 range_start(LogOffset)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
std::unique_ptr<ReplServerTaskArg> task_arg(static_cast<ReplServerTaskArg*>(arg));
const std::shared_ptr<InnerMessage::InnerRequest> req = task_arg->req;
std::shared_ptr<net::PbConn> conn = task_arg->conn;
if (!req->has_binlog_sync()) {
LOG(WARNING) << "Pb parse error";
// conn->NotifyClose();
return;
}
...
if (is_first_send) {
if (range_start.b_offset != range_end.b_offset) {
LOG(WARNING) << "first binlogsync request pb argument invalid";
conn->NotifyClose();
return;
}

Status s = master_slot->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
if (!s.ok()) {
LOG(WARNING) << "Activate Binlog Sync failed " << slave_node.ToString() << " " << s.ToString();
conn->NotifyClose();
return;
}
return;
}
...
}

FAQ

  • 为什么这里部分 conn->NotifyClose 注释掉了

src/pika_rm.cc

首先我们会获取到指定从节点的 slave_ptr 然后把它的 sent_offsetacked_offset 置为入参 offset的值, 然后调用 InitBinlogFileReader 初始化 BinlogReader,传入了一个入参 offset,然后调用 SyncBinlogToWq 把需要写的 Binlog 放到 write_queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Status SyncMasterSlot::ActivateSlaveBinlogSync(const std::string& ip, int port, const LogOffset& offset) {
std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode(ip, port);
if (!slave_ptr) {
return Status::NotFound("ip " + ip + " port " + std::to_string(port));
}

{
std::lock_guard l(slave_ptr->slave_mu);
slave_ptr->slave_state = kSlaveBinlogSync;
slave_ptr->sent_offset = offset;
slave_ptr->acked_offset = offset;
// read binlog file from file
Status s = slave_ptr->InitBinlogFileReader(Logger(), offset.b_offset);
if (!s.ok()) {
return Status::Corruption("Init binlog file reader failed" + s.ToString());
}
//Since we init a new reader, we should drop items in write queue and reset sync_window.
//Or the sent_offset and acked_offset will not match
g_pika_rm->DropItemInWriteQueue(ip, port);
slave_ptr->sync_win.Reset();
slave_ptr->b_state = kReadFromFile;
}

Status s = SyncBinlogToWq(ip, port);
if (!s.ok()) {
return s;
}
return Status::OK();
}

src/pika_slave_node.cc

InitBinlogFileReader 中构造出一个 binlog_reader 这里会调用 Seek 去更新 binlog_reader中的 filenumoffset

1
2
3
4
5
6
7
8
Status SlaveNode::InitBinlogFileReader(const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset) {
binlog_reader = std::make_shared<PikaBinlogReader>();
int res = binlog_reader->Seek(binlog, offset.filenum, offset.offset);
if (res != 0) {
return Status::Corruption(ToString() + " binlog reader init failed");
}
return Status::OK();
}

src/pika_binlog_reader.cc

Seek 中这里调用 NewFileName 将指定的 Binlog 文件打开,然后将文件 move queue, 初始化了 BinlogReader 中的成员变量偏移量 cur_offset_last_record_offset_, 这里有个 While 循环去更改 cur_offset, 其中调用了 GetNext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int PikaBinlogReader::Seek(const std::shared_ptr<Binlog>& logger, uint32_t filenum, uint64_t offset) {
std::string confile = NewFileName(logger->filename(), filenum);
...
std::unique_ptr<pstd::SequentialFile> readfile;
if (!pstd::NewSequentialFile(confile, readfile).ok()) {
LOG(WARNING) << "New swquential " << confile << " failed";
return -1;
}
...
queue_ = std::move(readfile);
logger_ = logger;

std::lock_guard l(rwlock_);
cur_filenum_ = filenum;
cur_offset_ = offset;
last_record_offset_ = cur_filenum_ % kBlockSize;

pstd::Status s;
uint64_t start_block = (cur_offset_ / kBlockSize) * kBlockSize;
s = queue_->Skip((cur_offset_ / kBlockSize) * kBlockSize);
uint64_t block_offset = cur_offset_ % kBlockSize;
uint64_t ret = 0;
uint64_t res = 0;
bool is_error = false;

while (true) {
if (res >= block_offset) {
cur_offset_ = start_block + res;
break;
}
ret = 0;
is_error = GetNext(&ret);
if (is_error) {
return -1;
}
res += ret;
}
last_record_offset_ = cur_offset_ % kBlockSize;
return 0;
}

src/pika_binlog_reader.cc

这里将 Binlog 文件重新开始读取,然后每次读完之后,res 会累加偏移量,等偏移量大于等于从节点的偏移量的时候停止,然后记录在 last_record_offset_ 中,last_record_offset_ 记录的是在某个 Block 中的偏移量,cur_offset 中记录的是在一整个文件中的偏移量.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bool PikaBinlogReader::GetNext(uint64_t* size) {
uint64_t offset = 0;
pstd::Status s;
bool is_error = false;

while (true) {
buffer_.clear();
s = queue_->Read(kHeaderSize, &buffer_, backing_store_.get());
if (!s.ok()) {
is_error = true;
return is_error;
}

const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[0]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[1]) & 0xff;
const uint32_t c = static_cast<uint32_t>(header[2]) & 0xff;
const unsigned int type = header[7];
const uint32_t length = a | (b << 8) | (c << 16);

if (length > (kBlockSize - kHeaderSize)) {
return true;
}

if (type == kFullType) {
s = queue_->Read(length, &buffer_, backing_store_.get());
offset += kHeaderSize + length;
break;
} else if (type == kFirstType) {
s = queue_->Read(length, &buffer_, backing_store_.get());
offset += kHeaderSize + length;
} else if (type == kMiddleType) {
s = queue_->Read(length, &buffer_, backing_store_.get());
offset += kHeaderSize + length;
} else if (type == kLastType) {
s = queue_->Read(length, &buffer_, backing_store_.get());
offset += kHeaderSize + length;
break;
} else if (type == kBadRecord) {
s = queue_->Read(length, &buffer_, backing_store_.get());
offset += kHeaderSize + length;
break;
} else {
is_error = true;
break;
}
}
*size = offset;
return is_error;
}

小结

在初始化 BinlogReader 步骤中,我们更新了在 Master 端记录的 SlaveNode 中 BinlogReaderlogger_, cur_filenum_, cur_offset_ , last_record_offset 的值,使其与从节点传过来的点位值保持一致,方便下一次从更新后点位开始读数据

FAQ

  • 在 TrySync 请求中已经对比过 MasterSlaveBinlog点位了,为什么这里要重新再次 Seek 一次

src/pika_rm.cc

更新完偏移量信息后,调用 ReadBinlogFIleToWq 将读取 Binlog 信息到 write_queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Status SyncMasterSlot::SyncBinlogToWq(const std::string& ip, int port) {
std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode(ip, port);
if (!slave_ptr) {
return Status::NotFound("ip " + ip + " port " + std::to_string(port));
}
Status s;
slave_ptr->Lock();
s = ReadBinlogFileToWq(slave_ptr);
slave_ptr->Unlock();
if (!s.ok()) {
return s;
}
return Status::OK();
}

src/pika_rm.cc

首先查看滑动窗口 sync_win中剩余可写的 Binlog 的条数,由于是第一次写数据到滑动窗口,在配置文件中滑动窗口的最大值是 9000,所以我们可以最大一次性可以把 9000Binlog 写到窗口里面,进入 for 循环之后,我们会先判断当前滑动窗口的所有的 Binlog字节大小是不是大于 1 个 G ,如果大于的话,本次就不会再往滑动窗口里面写数据,等待下一次发送,如果不大于就用 reader 调用 Get, 这里的 Status s = reader->Get(&msg, &filenum, &offset); 把 Binlog 中的信息提取到 msg 中,同时更新了 filenumoffset. 然后把将信息封装成 SyncWinItem,这里说明一下一个 SyncWinItem 代表了一个完整的 Binlog,封装好了之后往滑动窗口里面 Push, 最后用 WriteTask 进行组装,最终装载到 tasks 中,每次 Push完一个 task 后,Slave_ptr 都会更新一下 sent_offset ,就是这次 Reader 的读取点位,在 WriteTask 中封装了一个 RmNodeBinlogChip,所以最终的 tasks 装载的都是发往同一个节点的 Binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Status SyncMasterSlot::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>& slave_ptr) {
int cnt = slave_ptr->sync_win.Remaining();
std::shared_ptr<PikaBinlogReader> reader = slave_ptr->binlog_reader;
if (!reader) {
return Status::OK();
}
std::vector<WriteTask> tasks;
for (int i = 0; i < cnt; ++i) {
std::string msg;
uint32_t filenum;
uint64_t offset;
if (slave_ptr->sync_win.GetTotalBinlogSize() > PIKA_MAX_CONN_RBUF_HB * 2) {
LOG(INFO) << slave_ptr->ToString()
<< " total binlog size in sync window is :" << slave_ptr->sync_win.GetTotalBinlogSize();
break;
}
Status s = reader->Get(&msg, &filenum, &offset);
if (s.IsEndFile()) {
break;
} else if (s.IsCorruption() || s.IsIOError()) {
LOG(WARNING) << SyncSlotInfo().ToString() << " Read Binlog error : " << s.ToString();
return s;
}
BinlogItem item;
if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, msg, &item)) {
LOG(WARNING) << "Binlog item decode failed";
return Status::Corruption("Binlog item decode failed");
}
BinlogOffset sent_b_offset = BinlogOffset(filenum, offset);
LogicOffset sent_l_offset = LogicOffset(item.term_id(), item.logic_id());
LogOffset sent_offset(sent_b_offset, sent_l_offset);

slave_ptr->sync_win.Push(SyncWinItem(sent_offset, msg.size()));
slave_ptr->SetLastSendTime(pstd::NowMicros());
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SlotId(),
slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(sent_offset, msg), slave_ptr->sent_offset);
tasks.push_back(task);
slave_ptr->sent_offset = sent_offset;
}

if (!tasks.empty()) {
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), slot_info_.slot_id_, tasks);
}
return Status::OK();
}

src/pika_binlog_reader.cc

在 Get 函数中,调用 Consume 用来提取 Binlog 中的信息,如果当前的 Binlog 文件已经读到底了,则切换 Binlog 文件,更新 filenum 使其加 1,offset 初始化为 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Get a whole message;
// Append to scratch;
// the status will be OK, IOError or Corruption, EndFile;
Status PikaBinlogReader::Get(std::string* scratch, uint32_t* filenum, uint64_t* offset) {
if (!logger_ || !queue_) {
return Status::Corruption("Not seek");
}
scratch->clear();
Status s = Status::OK();

do {
if (ReadToTheEnd()) {
return Status::EndFile("End of cur log file");
}
s = Consume(scratch, filenum, offset);
if (s.IsEndFile()) {
std::string confile = NewFileName(logger_->filename(), cur_filenum_ + 1);

// sleep 10ms wait produce thread generate the new binlog
usleep(10000);

// Roll to next file need retry;
if (pstd::FileExists(confile)) {
DLOG(INFO) << "BinlogSender roll to new binlog" << confile;
queue_.reset();
queue_ = nullptr;

pstd::NewSequentialFile(confile, queue_);
{
std::lock_guard l(rwlock_);
cur_filenum_++;
cur_offset_ = 0;
}
last_record_offset_ = 0;
} else {
return Status::IOError("File Does Not Exists");
}
} else {
break;
}
} while (s.IsEndFile());

return Status::OK();
}

src/pika_binlog_reader.cc

Consume 中,如果当前记录是 kFullType 则一次性将读到的数据写到 scratch 中,如果是其他的类型,则进行数据的追加或者报异常,这里是个 while 循环去处理,每次取到一个完整的 Binlog 条数就退出循环(一条 Binlog 可能由一条或者多条 Record 组成)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Status PikaBinlogReader::Consume(std::string* scratch, uint32_t* filenum, uint64_t* offset) {
Status s;

pstd::Slice fragment;
while (true) {
const unsigned int record_type = ReadPhysicalRecord(&fragment, filenum, offset);

switch (record_type) {
case kFullType:
*scratch = std::string(fragment.data(), fragment.size());
s = Status::OK();
break;
case kFirstType:
scratch->assign(fragment.data(), fragment.size());
s = Status::NotFound("Middle Status");
break;
case kMiddleType:
scratch->append(fragment.data(), fragment.size());
s = Status::NotFound("Middle Status");
break;
case kLastType:
scratch->append(fragment.data(), fragment.size());
s = Status::OK();
break;
case kEof:
return Status::EndFile("Eof");
case kBadRecord:
LOG(WARNING)
<< "Read BadRecord record, will decode failed, this record may dbsync padded record, not processed here";
return Status::IOError("Data Corruption");
case kOldRecord:
return Status::EndFile("Eof");
default:
return Status::IOError("Unknow reason");
}
if (s.ok()) {
break;
}
}
// DLOG(INFO) << "Binlog Sender consumer a msg: " << scratch;
return Status::OK();
}

src/pika_binlog_reader.cc

我们可以看到每个 Slave节点中的 binlogreader 都保存了上次读取到的文件偏移量以及文件名,所以这样的设计解决了每次 Master 需要发生给 slave 哪些信息,首先我们先判断当前读取的这个 Block 剩下的字节数是不是小于 kHreadSize ,如果小于的话,我们则跳过当前这个 block, 从下一个 block 开始,然后调用 Read 函数先取元信息 kHeaderSize 的头部放到 buffer_ 中,然后获取到 length 长度,然后继续往 buffer_ 中追加后面的内容,将结果存在 result 里面,同时更新 last_record_offset 偏移量,然后继续返回上一层的循环中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
unsigned int PikaBinlogReader::ReadPhysicalRecord(pstd::Slice* result, uint32_t* filenum, uint64_t* offset) {
pstd::Status s;
if (kBlockSize - last_record_offset_ <= kHeaderSize) {
queue_->Skip(kBlockSize - last_record_offset_);
std::lock_guard l(rwlock_);
cur_offset_ += (kBlockSize - last_record_offset_);
last_record_offset_ = 0;
}
buffer_.clear();
s = queue_->Read(kHeaderSize, &buffer_, backing_store_.get());
if (s.IsEndFile()) {
return kEof;
} else if (!s.ok()) {
return kBadRecord;
}

const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[0]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[1]) & 0xff;
const uint32_t c = static_cast<uint32_t>(header[2]) & 0xff;
const unsigned int type = header[7];
const uint32_t length = a | (b << 8) | (c << 16);

if (length > (kBlockSize - kHeaderSize)) {
return kBadRecord;
}

if (type == kZeroType || length == 0) {
buffer_.clear();
return kOldRecord;
}

buffer_.clear();
s = queue_->Read(length, &buffer_, backing_store_.get());
*result = pstd::Slice(buffer_.data(), buffer_.size());
last_record_offset_ += kHeaderSize + length;
if (s.ok()) {
std::lock_guard l(rwlock_);
*filenum = cur_filenum_;
cur_offset_ += (kHeaderSize + length);
*offset = cur_offset_;
}
return type;
}

src/pika_rm.cc

把所有的 WriteTask 写到 write_queues_ 里面待发送,这里的 task 里面包括了每个 Binlog 的偏移量以及数据内容,而且这里的 tasks 是发往同一批 slotBinlog.

1
2
3
4
5
6
7
8
void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, uint32_t slot_id,
const std::vector<WriteTask>& tasks) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
for (auto& task : tasks) {
write_queues_[index][slot_id].push(task);
}
}

src/pika_auxiliary_thread.cc

辅助线程中的 SendToPeer 是将上面的 write_queues_ 的数据发送给从节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void* PikaAuxiliaryThread::ThreadMain() {
while (!should_stop()) {
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
g_pika_rm->RunSyncSlaveSlotStateMachine();
}

pstd::Status s = g_pika_rm->CheckSyncTimeout(pstd::NowMicros());
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}

g_pika_server->CheckLeaderProtectedMode();

// TODO(whoiami) timeout
s = g_pika_server->TriggerSendBinlogSync();
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
// send to peer
int res = g_pika_server->SendToPeer();
if (res == 0) {
// sleep 100 ms
std::unique_lock lock(mu_);
cv_.wait_for(lock, 100ms);
} else {
// LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
}
}
return nullptr;
}

src/pika_server.cc

调用 ConsumWriterQueue

1
int PikaServer::SendToPeer() { return g_pika_rm->ConsumeWriteQueue(); }

src/pika_rm.cc

这里把发往同一批机器的 Binlog 存放在 to_send_map 中,然后调用 SendSlaveBinlogChips 发送 BinlogSync 回包,这里注意一次发往一个 slotBinlog信息最大不会超过 4000 条, 同时这里的 SendSlaveBinlogChips 是在一个 for 循环里面的,说明主节点在一次 BinlogSync 中可能给从节点回复了多次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
int PikaReplicaManager::ConsumeWriteQueue() {
std::unordered_map<std::string, std::vector<std::vector<WriteTask>>> to_send_map;
int counter = 0;
{
std::lock_guard l(write_queue_mu_);
for (auto& iter : write_queues_) {
const std::string& ip_port = iter.first;
std::unordered_map<uint32_t, std::queue<WriteTask>>& p_map = iter.second;
for (auto& slot_queue : p_map) {
std::queue<WriteTask>& queue = slot_queue.second;
for (int i = 0; i < kBinlogSendPacketNum; ++i) {
if (queue.empty()) {
break;
}
size_t batch_index = queue.size() > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size();
std::vector<WriteTask> to_send;
size_t batch_size = 0;
for (size_t i = 0; i < batch_index; ++i) {
WriteTask& task = queue.front();
batch_size += task.binlog_chip_.binlog_.size();
...
to_send.push_back(task);
queue.pop();
counter++;
}
if (!to_send.empty()) {
to_send_map[ip_port].push_back(std::move(to_send));
}
}
}
}
}

std::vector<std::string> to_delete;
for (auto& iter : to_send_map) {
...
for (auto& to_send : iter.second) {
Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send);
if (!s.ok()) {
LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString();
to_delete.push_back(iter.first);
continue;
}
}
}
...
return counter;
}

FAQ

  • 这里为什么是发送 binlog 信息失败了就把 write_queues 相应的数据删了

pika_repl_server.cc

这里调用 BuildSyncResp 构造 BinlogSync 请求的回包,这里注意的是如果一个 Proto 回包数据大小大于 256M 的话,则将这里面的数据拆成多个 Proro 发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pstd::Status PikaReplServer::SendSlaveBinlogChips(const std::string& ip, int port,
const std::vector<WriteTask>& tasks) {
InnerMessage::InnerResponse response;
BuildBinlogSyncResp(tasks, &response);

std::string binlog_chip_pb;
if (!response.SerializeToString(&binlog_chip_pb)) {
return Status::Corruption("Serialized Failed");
}

if (binlog_chip_pb.size() > static_cast<size_t>(g_pika_conf->max_conn_rbuf_size())) {
for (const auto& task : tasks) {
InnerMessage::InnerResponse response;
std::vector<WriteTask> tmp_tasks;
tmp_tasks.push_back(task);
BuildBinlogSyncResp(tmp_tasks, &response);
if (!response.SerializeToString(&binlog_chip_pb)) {
return Status::Corruption("Serialized Failed");
}
pstd::Status s = Write(ip, port, binlog_chip_pb);
if (!s.ok()) {
return s;
}
}
return pstd::Status::OK();
}
return Write(ip, port, binlog_chip_pb);
}

src/pika_repl_server.cc

主节点给从节点回了 db_name, slot_id, session_id, boffsetbinlog_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void PikaReplServer::BuildBinlogSyncResp(const std::vector<WriteTask>& tasks, InnerMessage::InnerResponse* response) {
response->set_code(InnerMessage::kOk);
response->set_type(InnerMessage::Type::kBinlogSync);
for (const auto& task : tasks) {
InnerMessage::InnerResponse::BinlogSync* binlog_sync = response->add_binlog_sync();
binlog_sync->set_session_id(task.rm_node_.SessionId());
InnerMessage::Slot* slot = binlog_sync->mutable_slot();
slot->set_db_name(task.rm_node_.DBName());
slot->set_slot_id(task.rm_node_.SlotId());
InnerMessage::BinlogOffset* boffset = binlog_sync->mutable_binlog_offset();
BuildBinlogOffset(task.binlog_chip_.offset_, boffset);
binlog_sync->set_binlog(task.binlog_chip_.binlog_);
}
}

BinlogSync 总结

在主节点给从节点的 kBinlogSync 回复中,包括 session_id, db_name, slot_id, binlog_offset, binlog_

截屏2024-01-25 10 14 22

Slave 端生产和消费 Binlog

src/pika_repl_client_conn.cc

这里的 DispatchBinlogRes 是 Slave 端处理由 Master 端发过来的 Binlog 回包,调用 SchduleWriteBinlogTask 去处理 Binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void PikaReplClientConn::DispatchBinlogRes(const std::shared_ptr<InnerMessage::InnerResponse>& res) {
// slot to a bunch of binlog chips
std::unordered_map<SlotInfo, std::vector<int>*, hash_slot_info> par_binlog;
for (int i = 0; i < res->binlog_sync_size(); ++i) {
const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync(i);
// hash key: db + slot_id
SlotInfo p_info(binlog_res.slot().db_name(), binlog_res.slot().slot_id());
if (par_binlog.find(p_info) == par_binlog.end()) {
par_binlog[p_info] = new std::vector<int>();
}
par_binlog[p_info]->push_back(i);
}

std::shared_ptr<SyncSlaveSlot> slave_slot = nullptr;
for (auto& binlog_nums : par_binlog) {
RmNode node(binlog_nums.first.db_name_, binlog_nums.first.slot_id_);
slave_slot = g_pika_rm->GetSyncSlaveSlotByName(
SlotInfo(binlog_nums.first.db_name_, binlog_nums.first.slot_id_));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << binlog_nums.first.db_name_ << "_" << binlog_nums.first.slot_id_
<< " not exist";
break;
}
slave_slot->SetLastRecvTime(pstd::NowMicros());
g_pika_rm->ScheduleWriteBinlogTask(binlog_nums.first.db_name_ + std::to_string(binlog_nums.first.slot_id_),
res, std::dynamic_pointer_cast<PikaReplClientConn>(shared_from_this()),
reinterpret_cast<void*>(binlog_nums.second));
}
}

src/pika_rm.cc

这里调用了 ScheduleWriteBinlogTask

1
2
3
4
5
void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db_slot,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
pika_repl_client_->ScheduleWriteBinlogTask(db_slot, res, conn, res_private_data);
}

src/pika_repl_clint.cc

这里调用了 Scheduleworker 线程去异步处理 HandleBGWokerWriteBinlog

1
2
3
4
5
6
7
void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_slot,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
size_t index = GetHashIndex(db_slot, true);
auto task_arg = new ReplClientWriteBinlogTaskArg(res, conn, res_private_data, bg_workers_[index].get());
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}

src/pika_repl_bgworker.cc

这里将 binlog_res 里面的 binlog().data()进行序列化解析,然后放在 ProcessInputBuffer 里面处理,这里可以看到 HandelBGWorkerWriteBinlog 函数的最后调用了 SendSlotBinlogSyncAckRequest 就是说处理完上一个 Binlog 回包马上又发了一个新的 BinlogSync 请求过去,这里需要注意的是这个请求中的两个参数 ack_startack_end, 下次传过去时候的 ack_start 就是这次 Binlog 回包时传进来的 ack_start ,但是下次传过去的 ack_end是本次 Binlog 写完之后的偏移量,所以 ack_start 一定是一样的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
...

// find the first not keepalive binlogsync
for (size_t i = 0; i < index->size(); ++i) {
const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync((*index)[i]);
if (i == 0) {
db_name = binlog_res.slot().db_name();
slot_id = binlog_res.slot().slot_id();
}
if (!binlog_res.binlog().empty()) {
ParseBinlogOffset(binlog_res.binlog_offset(), &pb_begin);
break;
}
}
...

if (pb_begin == LogOffset()) {
only_keepalive = true;
}

LogOffset ack_start;
if (only_keepalive) {
ack_start = LogOffset();
} else {
ack_start = pb_begin;
}

...
for (int i : *index) {
...
const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN;
int redis_parser_len = static_cast<int>(binlog_res.binlog().size()) - BINLOG_ENCODE_LEN;
int processed_len = 0;
net::RedisParserStatus ret =
worker->redis_parser_.ProcessInputBuffer(redis_parser_start, redis_parser_len, &processed_len); // 处理Binlog
if (ret != net::kRedisParserDone) {
LOG(WARNING) << "Redis parser failed";
slave_slot->SetReplState(ReplState::kTryConnect);
return;
}
}
...

LogOffset ack_end;
if (only_keepalive) {
ack_end = LogOffset();
} else {
LogOffset productor_status;
// Reply Ack to master immediately
std::shared_ptr<Binlog> logger = slot->Logger();
logger->GetProducerStatus(&productor_status.b_offset.filenum, &productor_status.b_offset.offset,
&productor_status.l_offset.term, &productor_status.l_offset.index);
ack_end = productor_status;
ack_end.l_offset.term = pb_end.l_offset.term;
}

g_pika_rm->SendSlotBinlogSyncAckRequest(db_name, slot_id, ack_start, ack_end);
}

src/redis_parser.cc

调用 ProcessRequestBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
RedisParserStatus RedisParser::ProcessInputBuffer(const char* input_buf, int length, int* parsed_len) {
if (status_code_ == kRedisParserInitDone || status_code_ == kRedisParserHalf || status_code_ == kRedisParserDone) {
// TODO(): AZ: avoid copy
std::string tmp_str(input_buf, length);
input_str_ = half_argv_ + tmp_str;
input_buf_ = input_str_.c_str();
length_ = static_cast<int32_t>(length + half_argv_.size());
if (redis_parser_type_ == REDIS_PARSER_REQUEST) {
ProcessRequestBuffer();
} else if (redis_parser_type_ == REDIS_PARSER_RESPONSE) {
ProcessResponseBuffer();
} else {
SetParserStatus(kRedisParserError, kRedisParserInitError);
return status_code_;
}
// cur_pos_ starts from 0, val of cur_pos_ is the parsed_len
*parsed_len = cur_pos_;
ResetRedisParser();
// PrintCurrentStatus();
return status_code_;
}
SetParserStatus(kRedisParserError, kRedisParserInitError);
return status_code_;
}

src/redis_parser.cc

这里将解析好的 cmd 放到 argv_ 中,然后触发 DealMessage 调用 HandleWriteBinlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
RedisParserStatus RedisParser::ProcessRequestBuffer() {
RedisParserStatus ret;
while (cur_pos_ <= length_ - 1) {
if (redis_type_ == 0) {
if (input_buf_[cur_pos_] == '*') {
redis_type_ = REDIS_REQ_MULTIBULK;
} else {
redis_type_ = REDIS_REQ_INLINE;
}
}

if (redis_type_ == REDIS_REQ_INLINE) {
ret = ProcessInlineBuffer();
if (ret != kRedisParserDone) {
return ret;
}
} else if (redis_type_ == REDIS_REQ_MULTIBULK) {
ret = ProcessMultibulkBuffer();
if (ret != kRedisParserDone) { // FULL_ERROR || HALF || PARSE_ERROR
return ret;
}
} else {
// Unknown requeset type;
return kRedisParserError;
}
if (!argv_.empty()) {
argvs_.push_back(argv_);
if (parser_settings_.DealMessage) {
if (parser_settings_.DealMessage(this, argv_) != 0) {
SetParserStatus(kRedisParserError, kRedisParserDealError);
return status_code_;
}
}
}
argv_.clear();
// Reset
ResetCommandStatus();
}
if (parser_settings_.Complete) {
if (parser_settings_.Complete(this, argvs_) != 0) {
SetParserStatus(kRedisParserError, kRedisParserCompleteError);
return status_code_;
}
}
argvs_.clear();
SetParserStatus(kRedisParserDone);
return status_code_; // OK
}

src/pika_repl_bgworker.cc

这里调用 ConsensusProcessLeaderLog 处理 Binlog

1
2
3
4
5
int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::RedisCmdArgsType& argv) {
...
slot->ConsensusProcessLeaderLog(c_ptr, worker->binlog_item_);
return 0;
}

src/pika_rm.cc

调用 ProposeLeaderLog

1
2
3
Status SyncMasterSlot::ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
return coordinator_.ProcessLeaderLog(cmd_ptr, attribute);
}

src/pika_consensus.cc

调用 InternalAppendLog 先写 Binlog,然后 InternalApplyFollower异步消费 Binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// precheck if prev_offset match && drop this log if this log exist
Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
LogOffset last_index = mem_logger_->last_offset();
if (attribute.logic_id() < last_index.l_offset.index) {
LOG(WARNING) << SlotInfo(db_name_, slot_id_).ToString() << "Drop log from leader logic_id "
<< attribute.logic_id() << " cur last index " << last_index.l_offset.index;
return Status::OK();
}

Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr);

InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
return Status::OK();
}

写 Binlog

src/pika_consensus.cc

调用 InternalAppendBinlog

1
2
3
4
5
6
7
8
9
10
Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
LogOffset log_offset;
Status s = InternalAppendBinlog(item, cmd_ptr, &log_offset);
if (!s.ok()) {
return s;
}
return Status::OK();
}

src/pika_consensus.cc

这里现将命令序列化然后调用 put 函数将 content 写到 Binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
LogOffset* log_offset) {
std::string content = cmd_ptr->ToRedisProtocol();
Status s = stable_logger_->Logger()->Put(content);
if (!s.ok()) {
std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name();
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name);
if (db) {
db->SetBinlogIoError();
}
return s;
}
uint32_t filenum;
uint64_t offset;
stable_logger_->Logger()->GetProducerStatus(&filenum, &offset);
*log_offset = LogOffset(BinlogOffset(filenum, offset), LogicOffset(item.term_id(), item.logic_id()));
return Status::OK();
}

消费 Binlog

src/pika_consensus.cc

调用 SchduleWriteDBTask

1
2
3
void ConsensusCoordinator::InternalApplyFollower(const MemLog::LogItem& log) {
g_pika_rm->ScheduleWriteDBTask(log.cmd_ptr, log.offset, db_name_, slot_id_);
}

src/pika_rm.cc

调用 ScheduleWriteDBTask

1
2
3
4
void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name, uint32_t slot_id) {
pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, offset, db_name, slot_id);
}

src/pika_repl_client.cc

调用 Schedule

1
2
3
4
5
6
7
8
void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name, uint32_t slot_id) {
const PikaCmdArgsType& argv = cmd_ptr->argv();
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
size_t index = GetHashIndex(dispatch_key, false);
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr, offset, db_name, slot_id);
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
}

pika_repl_bgworker.cc

这里是消费 Binlog 的步骤,就是调用 Do 执行命令的流程了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
std::unique_ptr<ReplClientWriteDBTaskArg> task_arg(static_cast<ReplClientWriteDBTaskArg*>(arg));
const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr;
const PikaCmdArgsType& argv = c_ptr->argv();
LogOffset offset = task_arg->offset;
std::string db_name = task_arg->db_name;
uint32_t slot_id = task_arg->slot_id;

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = pstd::NowMicros();
}
std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById(db_name, slot_id);
// Add read lock for no suspend command
if (!c_ptr->IsSuspend()) {
slot->DbRWLockReader();
}
if (c_ptr->IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_model()
&& slot->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (c_ptr->is_write()) {
c_ptr->DoThroughDB(slot);
if (c_ptr->IsNeedUpdateCache()) {
c_ptr->DoUpdateCache(slot);
}
} else {
LOG(WARNING) << "This branch is not impossible reach";
}
} else {
c_ptr->Do(slot);
}
if (!c_ptr->IsSuspend()) {
slot->DbRWUnLock();
}

if (g_pika_conf->slowlog_slower_than() >= 0) {
auto start_time = static_cast<int32_t>(start_us / 1000000);
auto duration = static_cast<int64_t>(pstd::NowMicros() - start_us);
if (duration > g_pika_conf->slowlog_slower_than()) {
g_pika_server->SlowlogPushEntry(argv, start_time, duration);
if (g_pika_conf->slowlog_write_errorlog()) {
LOG(ERROR) << "command: " << argv[0] << ", start_time(s): " << start_time << ", duration(us): " << duration;
}
}
}
}

src/pika_rm.cc

在生产和消费完 Binlog 之后,从节点再次发起 BinlogSync 请求和之前的步骤都是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
Status PikaReplicaManager::SendSlotBinlogSyncAckRequest(const std::string& db, uint32_t slot_id,
const LogOffset& ack_start, const LogOffset& ack_end,
bool is_first_send) {
std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName(SlotInfo(db, slot_id));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << db << ":" << slot_id << ", NotFound";
return Status::Corruption("Slave Slot not found");
}
return pika_repl_client_->SendSlotBinlogSync(slave_slot->MasterIp(), slave_slot->MasterPort(), db,
slot_id, ack_start, ack_end, slave_slot->LocalIp(),
is_first_send);
}

Master 接收 Slave 的 kBinlogSync 请求

src/pika_repl_server_conn.cc

这里调用 UpdateSyncBinlogStatus 更新一下主节点这边记录的 Binlog 读取点位的信息,同时把新一批的 binlog 信息写到 write_queue 队列中

1
2
3
4
5
6
7
8
9
10
11
void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
...
s = g_pika_rm->UpdateSyncBinlogStatus(slave_node, range_start, range_end);
if (!s.ok()) {
LOG(WARNING) << "Update binlog ack failed " << db_name << " " << slot_id << " " << s.ToString();
conn->NotifyClose();
return;
}

g_pika_server->SignalAuxiliary();
}

src/pika_rm.cc

调用 ConsensusUpdateSlave 更新点位信息,然后和之前一样调用 SyncBinlogToWq 将新的 Binlog 写到 write_queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status PikaReplicaManager::UpdateSyncBinlogStatus(const RmNode& slave, const LogOffset& offset_start,
const LogOffset& offset_end) {
std::shared_lock l(slots_rw_);
if (sync_master_slots_.find(slave.NodeSlotInfo()) == sync_master_slots_.end()) {
return Status::NotFound(slave.ToString() + " not found");
}
std::shared_ptr<SyncMasterSlot> slot = sync_master_slots_[slave.NodeSlotInfo()];
Status s = slot->ConsensusUpdateSlave(slave.Ip(), slave.Port(), offset_start, offset_end);
if (!s.ok()) {
return s;
}
s = slot->SyncBinlogToWq(slave.Ip(), slave.Port());
if (!s.ok()) {
return s;
}
return Status::OK();
}

FAQ

  • 这里的 SyncBinlogToWq 为什么不需要对比一下 ack_offsetsent_offset

src/pika_rm.cc

调用 UpdateSlave

1
2
3
4
5
6
7
8
9
Status SyncMasterSlot::ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start,
const LogOffset& end) {
Status s = coordinator_.UpdateSlave(ip, port, start, end);
if (!s.ok()) {
LOG(WARNING) << SyncSlotInfo().ToString() << s.ToString();
return s;
}
return Status::OK();
}

src/pika_consensus.cc

调用 Update

1
2
3
4
5
6
7
8
9
10
Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const LogOffset& start,
const LogOffset& end) {
LogOffset committed_index;
Status s = sync_pros_.Update(ip, port, start, end, &committed_index);
if (!s.ok()) {
return s;
}

return Status::OK();
}

src/pika_consensus.cc

调用 Update

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status SyncProgress::Update(const std::string& ip, int port, const LogOffset& start, const LogOffset& end,
LogOffset* committed_index) {
...
LogOffset acked_offset;
{
// update slave_ptr
std::lock_guard l(slave_ptr->slave_mu);
Status s = slave_ptr->Update(start, end, &acked_offset);
if (!s.ok()) {
return s;
}
...
}

return Status::OK();
}

src/pika_slave_node.cc

这里调用 Update 更新滑动窗口 sync_win 的信息,这里可以看到最终更新了 acked_offset 的值,acked_offset 更新后的值就是上次一批 Binlog 请求最后发送的 sent_offset 的值,这样能确保 acked_offsetsent_offset的值保持相等,确保数据不丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status SlaveNode::Update(const LogOffset& start, const LogOffset& end, LogOffset* updated_offset) {
if (slave_state != kSlaveBinlogSync) {
return Status::Corruption(ToString() + "state not BinlogSync");
}
*updated_offset = LogOffset();
bool res = sync_win.Update(SyncWinItem(start), SyncWinItem(end), updated_offset);
if (!res) {
return Status::Corruption("UpdateAckedInfo failed");
}
if (*updated_offset == LogOffset()) {
// nothing to update return current acked_offset
*updated_offset = acked_offset;
return Status::OK();
}
// update acked_offset
acked_offset = *updated_offset;
return Status::OK();
}

src/pika_slave_node.cc

这里的 win_ 实际上是一个双端队列,里面存的是 SyncWinItem 类型的信息,传入的 start_itemend_itme 就是已经被从消费过的 Binlog 偏移量,然后把这些从 win_pop_front 出来,使 win_ 中剩下下次需要传递的 Binlog 偏移量的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_item, LogOffset* acked_offset) {
size_t start_pos = win_.size();
size_t end_pos = win_.size();
for (size_t i = 0; i < win_.size(); ++i) {
if (win_[i] == start_item) {
start_pos = i;
}
if (win_[i] == end_item) {
end_pos = i;
break;
}
}
if (start_pos == win_.size() || end_pos == win_.size()) {
LOG(WARNING) << "Ack offset Start: " << start_item.ToString() << "End: " << end_item.ToString()
<< " not found in binlog controller window." << std::endl
<< "window status " << std::endl
<< ToStringStatus();
return false;
}
for (size_t i = start_pos; i <= end_pos; ++i) {
win_[i].acked_ = true;
total_size_ -= win_[i].binlog_size_;
}
while (!win_.empty()) {
if (win_[0].acked_) {
*acked_offset = win_[0].offset_;
win_.pop_front();
} else {
break;
}
}
return true;
}

总结

在增量同步中,Master 先把自己的 Binlog 的偏移量和序列化后的 record 信息发送给 Slave,并记录这个偏移量为 sent_offset ,然后 Slave 端消费完之后回给 Master 一个 ack_startack_end 去更新 Master 这边的 ack_offset ,只有 sent_offsetack_offset 相同时 Master 才能继续和 Slave 做增量同步

单机 Binlog 生产

src/pika.command.cc

在执行完命令的流程后执行 DoBinlog 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Cmd::InternalProcessCommand(const std::shared_ptr<Slot>& slot, const std::shared_ptr<SyncMasterSlot>& sync_slot,
const HintKeys& hint_keys) {
pstd::lock::MultiRecordLock record_lock(slot->LockMgr());
if (is_write()) {
record_lock.Lock(current_key());
}

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = pstd::NowMicros();
}
DoCommand(slot, hint_keys); // 执行命令流程
if (g_pika_conf->slowlog_slower_than() >= 0) {
do_duration_ += pstd::NowMicros() - start_us;
}

DoBinlog(sync_slot); // 这里执行Binlog逻辑

if (is_write()) {
record_lock.Unlock(current_key());
}
}

src/pika.command.cc

DoBinlog 函数中,首先判断当前的命令是不是写命令,只有写命令才需要记录 Binlog 然后获取到当前的 connresponse, 然后执行 SyncMasterSlotConsensusProposeLog 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void Cmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) { // 判断是不是写命令
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
// Consider that dummy cmd appended by system, both conn and resp are null.
if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
if (!conn_ptr) {
LOG(WARNING) << slot->SyncSlotInfo().ToString() << " conn empty.";
}
if (!resp_ptr) {
LOG(WARNING) << slot->SyncSlotInfo().ToString() << " resp empty.";
}
res().SetRes(CmdRes::kErrOther);
return;
}

Status s =
slot->ConsensusProposeLog(shared_from_this(), std::dynamic_pointer_cast<PikaClientConn>(conn_ptr), resp_ptr);
if (!s.ok()) {
LOG(WARNING) << slot->SyncSlotInfo().ToString() << " Writing binlog failed, maybe no space left on device "
<< s.ToString();
res().SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}

src/pika_rm.cc

这里的 coordinator_ConsensusCoordinator 类,是 SyncSlaveSlot 类的私有成员变量

1
2
3
4
Status SyncMasterSlot::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
return coordinator_.ProposeLog(cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
}

src/pika_consensus.cc

ProposeLog 中,定义了一个 LogOffsetBinlogItem 变量,然后调用 InternalAppendLog 去执行写 Binlog, 同时调用 SignalAuxiliary 去唤醒辅助线程去通知主节点同步 Binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
std::vector<std::string> keys = cmd_ptr->current_key();
// slotkey shouldn't add binlog
if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() &&
(keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 || keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) {
return Status::OK();
}

LogOffset log_offset;

BinlogItem item;
// make sure stable log and mem log consistent
Status s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
if (!s.ok()) {
return s;
}

g_pika_server->SignalAuxiliary();
return Status::OK();
}

void PikaServer::SignalAuxiliary() { pika_auxiliary_thread_->cv_.notify_one(); }

其中在 BinlogItem 类中有 exec_timeterm_idlogic_idfilenumoffsetcontentextends 等指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BinlogItem {
public:
BinlogItem() = default;

friend class PikaBinlogTransverter;

uint32_t exec_time() const;
uint32_t term_id() const;
uint64_t logic_id() const;
uint32_t filenum() const;
uint64_t offset() const;
std::string content() const;
std::string ToString() const;

private:
uint32_t exec_time_ = 0;
uint32_t term_id_ = 0;
uint64_t logic_id_ = 0;
uint32_t filenum_ = 0;
uint64_t offset_ = 0;
std::string content_;
std::vector<std::string> extends_;
};

src/pika_consensus.cc

这里调用 InternalAppendBinlog 方法写 Binlog, 这里传入的是 cmd_ptr , BinlogItem , LogOffset.

1
2
3
4
5
6
7
8
9
10
Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
LogOffset log_offset;
Status s = InternalAppendBinlog(item, cmd_ptr, &log_offset);
if (!s.ok()) {
return s;
}
return Status::OK();
}

src/pika_consensus.cc

这里先把 cmd_ptr 中的命令根据 Redis 协议序列化成 content ,这里的 stable_logger_StableLog 对象,这里调用 Put 函数将 content 的内容写到文件中,然后调用 GetProducerStatus ,然后调用 LogOffset, 更新 offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
LogOffset* log_offset) {
std::string content = cmd_ptr->ToRedisProtocol();
Status s = stable_logger_->Logger()->Put(content);
if (!s.ok()) {
std::string db_name = cmd_ptr->db_name().empty() ? g_pika_conf->default_db() : cmd_ptr->db_name();
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name);
if (db) {
db->SetBinlogIoError();
}
return s;
}
uint32_t filenum;
uint64_t offset;
stable_logger_->Logger()->GetProducerStatus(&filenum, &offset);
*log_offset = LogOffset(BinlogOffset(filenum, offset), LogicOffset(item.term_id(), item.logic_id()));
return Status::OK();
}

src/pika_binlog.cc

这里 Binlog 重载了 Put 函数,第一个 Put 先初始化 filenum, term_id , offset, logic_id 变量,调用 GetProduceStatus 获取当前 Binlog 中的最新值(filenum,pro_offset,term_id,logic_id),然后调用 BinlogEncode 组装 Binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Note: mutex lock should be held
Status Binlog::Put(const std::string& item) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
uint32_t filenum = 0;
uint32_t term = 0;
uint64_t offset = 0;
uint64_t logic_id = 0;

Lock();
DEFER {
Unlock();
};

Status s = GetProducerStatus(&filenum, &offset, &term, &logic_id);
if (!s.ok()) {
return s;
}
logic_id++;
std::string data = PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst,
time(nullptr), term, logic_id, filenum, offset, item, {});

s = Put(data.c_str(), static_cast<int>(data.size()));
if (!s.ok()) {
binlog_io_error_.store(true);
}
return s;
}

src/pika_binlog_transverter.cc

这里的 BinlogEncode 来组装 Binlog 的信息,返回一个组装好的 binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::string PikaBinlogTransverter::BinlogEncode(BinlogType type, uint32_t exec_time, uint32_t term_id,
uint64_t logic_id, uint32_t filenum, uint64_t offset,
const std::string& content, const std::vector<std::string>& extends) {
std::string binlog;
pstd::PutFixed16(&binlog, type);
pstd::PutFixed32(&binlog, exec_time);
pstd::PutFixed32(&binlog, term_id);
pstd::PutFixed64(&binlog, logic_id);
pstd::PutFixed32(&binlog, filenum);
pstd::PutFixed64(&binlog, offset);
uint32_t content_length = content.size();
pstd::PutFixed32(&binlog, content_length);
binlog.append(content);
return binlog;
}
1
2
| type | exec_time | term_id | logic_id | filenum | offset | content_len | content | 
| 2B | 4B | 4B | 8B | 4B | 8B | 4B | |

这个就是 Binlog 这里的组装,一共 34 字节

src/pika_binlog.cc

我们看下第二个 Put 函数里面做了什么操作,这里如果当前文件大于预设定的文件大小,我们会调用 NewWritableFile 函数将新创建一个 Binlog 文件,并将 Binlog 中的 pro_offset 置为 0, pro_num 文件名索引加一,更新,然后调用 Produce 方法,处理 Binlog 逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Note: mutex lock should be held
Status Binlog::Put(const char* item, int len) {
Status s;

/* Check to roll log file */
uint64_t filesize = queue_->Filesize();
if (filesize > file_size_) {
std::unique_ptr<pstd::WritableFile> queue;
std::string profile = NewFileName(filename_, pro_num_ + 1);
s = pstd::NewWritableFile(profile, queue);
if (!s.ok()) {
LOG(ERROR) << "Binlog: new " << filename_ << " " << s.ToString();
return s;
}
queue_.reset();
queue_ = std::move(queue);
pro_num_++;

{
std::lock_guard l(version_->rwlock_);
version_->pro_offset_ = 0;
version_->pro_num_ = pro_num_;
version_->StableSave();
}
InitLogFile();
}

int pro_offset;
s = Produce(pstd::Slice(item, len), &pro_offset);
if (s.ok()) {
std::lock_guard l(version_->rwlock_);
version_->pro_offset_ = pro_offset;
version_->logic_id_++;
version_->StableSave();
}

return s;
}

src/pika_binlog.cc

Produce 函数中,tmp_pro_offset 是当前 Binlog 文件中的偏移量

我们用 left 记录了当前需要写入的一条 Binlog 的字节大小,用 leftover 记录当前 block 还剩下可填充的字节数,如果当前剩余可用的字节数都小于 kHeaderSize(组成Binlog的头部) 的话,则当前的 Block 后续用 \x00 填充,然后新起一个 Block, 然后将 block_offset_ 置为 0,表示新的 Block 的偏移量,同时 tmp_pro_offset 也进行更新. avail来记录当前除了 kHeadSizeblock_offset_(当前block已写进的部分) 之后一个 block 中还能记录的字节数,然后用 leftavail 作比较,看是否在当前的 block 装载下,如果 left < avail 说明当前的 block可以装载下这一条 record 记录,我们把 type 置为 kFullType 其余的情况我们就置为其他的类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Status Binlog::Produce(const pstd::Slice& item, int* temp_pro_offset) {
Status s;
const char* ptr = item.data();
size_t left = item.size();
bool begin = true;

*temp_pro_offset = static_cast<int>(version_->pro_offset_);
do {
const int leftover = static_cast<int>(kBlockSize) - block_offset_;
assert(leftover >= 0);
if (static_cast<size_t>(leftover) < kHeaderSize) {
if (leftover > 0) {
s = queue_->Append(pstd::Slice("\x00\x00\x00\x00\x00\x00\x00", leftover));
if (!s.ok()) {
return s;
}
*temp_pro_offset += leftover;
}
block_offset_ = 0;
}

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}

s = EmitPhysicalRecord(type, ptr, fragment_length, temp_pro_offset);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);

return s;
}

src/pika_binlog.cc

这里就是把需要写入的东西放到 queue_ 里面,然后下次打开 Binlog 文件的时候就会把新加的部分更新上去,至此一条 Record 记录就落盘了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Status Binlog::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset) {
Status s;
assert(n <= 0xffffff);
assert(block_offset_ + kHeaderSize + n <= kBlockSize);

char buf[kHeaderSize];

uint64_t now;
struct timeval tv;
gettimeofday(&tv, nullptr);
now = tv.tv_sec;
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
buf[3] = static_cast<char>(now & 0xff);
buf[4] = static_cast<char>((now & 0xff00) >> 8);
buf[5] = static_cast<char>((now & 0xff0000) >> 16);
buf[6] = static_cast<char>((now & 0xff000000) >> 24);
buf[7] = static_cast<char>(t);

s = queue_->Append(pstd::Slice(buf, kHeaderSize));
if (s.ok()) {
s = queue_->Append(pstd::Slice(ptr, n));
if (s.ok()) {
s = queue_->Flush();
}
}
block_offset_ += static_cast<int32_t>(kHeaderSize + n);

*temp_pro_offset += static_cast<int32_t>(kHeaderSize + n);
return s;
}
1
2
| length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | 
| 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |

所以最终的一条 record 就是以这样的形式记录的

src/pika_binlog.cc

通过以上的代码,大家发现好像这个 Binlog 数据没有落盘,只是在 queue_ (内存)中,这里的 AppendWritableFile 函数就是利用 mmap 进行落盘操作,它在 Binlog 的构造函数中调用,操作系统会定期的将内存中的数据映射到磁盘上来实现落盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
Binlog::Binlog(std::string  binlog_path, const int file_size)
: opened_(false),
binlog_path_(std::move(binlog_path)),
file_size_(file_size),
binlog_io_error_(false) {
// To intergrate with old version, we don't set mmap file size to 100M;
// pstd::SetMmapBoundSize(file_size);
// pstd::kMmapBoundSize = 1024 * 1024 * 100;

Status s;

pstd::CreateDir(binlog_path_);

filename_ = binlog_path_ + kBinlogPrefix;
const std::string manifest = binlog_path_ + kManifest;
std::string profile;

if (!pstd::FileExists(manifest)) {
LOG(INFO) << "Binlog: Manifest file not exist, we create a new one.";

profile = NewFileName(filename_, pro_num_);
s = pstd::NewWritableFile(profile, queue_);
if (!s.ok()) {
LOG(FATAL) << "Binlog: new " << filename_ << " " << s.ToString();
}
std::unique_ptr<pstd::RWFile> tmp_file;
s = pstd::NewRWFile(manifest, tmp_file);
versionfile_.reset(tmp_file.release());
if (!s.ok()) {
LOG(FATAL) << "Binlog: new versionfile error " << s.ToString();
}

version_ = std::make_unique<Version>(versionfile_);
version_->StableSave();
} else {
LOG(INFO) << "Binlog: Find the exist file.";
std::unique_ptr<pstd::RWFile> tmp_file;
s = pstd::NewRWFile(manifest, tmp_file);
versionfile_.reset(tmp_file.release());
if (s.ok()) {
version_ = std::make_unique<Version>(versionfile_);
version_->Init();
pro_num_ = version_->pro_num_;

// Debug
// version_->debug();
} else {
LOG(FATAL) << "Binlog: open versionfile error";
}

profile = NewFileName(filename_, pro_num_);
DLOG(INFO) << "Binlog: open profile " << profile;
s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_); // 落盘
if (!s.ok()) {
LOG(FATAL) << "Binlog: Open file " << profile << " error " << s.ToString();
}

uint64_t filesize = queue_->Filesize();
DLOG(INFO) << "Binlog: filesize is " << filesize;
}

InitLogFile();
}

总结

我们所说的 Binlog 其实就是 Pika 中存在的 write2file 文件,一个 write2file 文件由多个 block 组成,每个 block 大小固定为 64KB, 在 block 中记录的就是一条条 record ,一个 record 就是一条序列化后的 redis 命令(也可以是多条,当然也有一个 Binlog 用多个 record 记录),每个 write2file 文件都有一定的大小,如果当前的 block 剩余字节量还不如填充一条 record 的头部,则当前 block 当前剩余部分的字节用 \x00填充,然后新起一个 block 来装载 record

FAQ

  • 代码中的 Binlog 头部是 kHeadseSize 是 1+ 3 + 4 也就是 8 字节,但是在 pika_binlog_transverter.h 中对 Binlog 头部的描述是 Type(2B) + exec_time(4B) + term_id(4B) + logic_id(8B) + filenum(4B) + offset(8B) + content_len(4B) 算下来是 34 字节,为什么会不一样?

    真正头部的组装是 1(Type) + 3(length) + 4(time) 后面再加 34 字节

    1
    2
    | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | 
    | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |

  • 如果是调用 AppendWritableFile 每次进行 Binlog 数据落盘的话,为什么放在 Binlog 的构造函数中

    操作系统会定期把内存中的值映射写到磁盘上

  • 如果存在一种情况,当前 Block 剩余部分能存下一条 record 的头部和内容的一部分,那么当前这个 block 应该会存完整个 record 而不是新起一个 Block 存取吗?

    会新起一个 Block 存取,如果新起前已经到了文件设置的最大的 Block 数量依然会新起,所以 write2file 文件的大小不是固定的,但是 Block 的大小一定是固定的

  • 主节点写 Binlog 的时候怎么向辅助线程发送信号的?

    使用信号量pika_auxiliary_thread_->cv_.notify_one();

问题

  1. 在一条 Binlog 记录中,有字段重复的组装

    1
    2
    | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | 
    | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
  2. type 字段一直都是 TypeFirst 这个默认值

辅助线程驱动 BinlogSync

src/pika_auxiliary_thread.cc

由于从节点是 BinlogSync 的发起者,但是一段时间没有数据同步之后,从感知不到主上新的数据写入,从而不能再主动再次发起 BinlogSync 流程,这时候需要辅助线程来调用 TriggerSendBinlogSync,驱动 BinlogSync 流程。TriggerSendBinlogSync 的主要逻辑就是将新写入的增量放入 write_queues,再发送给从节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void* PikaAuxiliaryThread::ThreadMain() {
while (!should_stop()) {
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
g_pika_rm->RunSyncSlaveSlotStateMachine();
}

pstd::Status s = g_pika_rm->CheckSyncTimeout(pstd::NowMicros());
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}

g_pika_server->CheckLeaderProtectedMode();

// TODO(whoiami) timeout
s = g_pika_server->TriggerSendBinlogSync(); // 将Binlog写入发送队列
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
// send to peer
int res = g_pika_server->SendToPeer(); // 将发送队列的Binlog发给slave
if (res == 0) {
// sleep 100 ms
std::unique_lock lock(mu_);
cv_.wait_for(lock, 100ms);
} else {
// LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
}
}
return nullptr;
}

src/pika_server.cc

调用 WakeUpBinlogSync

1
Status PikaServer::TriggerSendBinlogSync() { return g_pika_rm->WakeUpBinlogSync(); }

src/pika_rm.cc

调用 WakeUpSlaveBinlogSync

1
2
3
4
5
6
7
8
9
10
11
Status PikaReplicaManager::WakeUpBinlogSync() {
std::shared_lock l(slots_rw_);
for (auto& iter : sync_master_slots_) {
std::shared_ptr<SyncMasterSlot> slot = iter.second;
Status s = slot->WakeUpSlaveBinlogSync();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}

src/pika_rm.cc

调用 ReadBinlogFileWq , 这里的步骤和上面的类似,把 Binlog 写到 write_queue中,这里会判断 sent_offsetacked_offset 是否保持一致,如果相等说明之前的主动部分的增量同步的数据已经同步完毕,所以 ReadBinlogFileToWq 调用之前主从的增量数据肯定是已经同步了的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status SyncMasterSlot::WakeUpSlaveBinlogSync() {
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes();
std::vector<std::shared_ptr<SlaveNode>> to_del;
for (auto& slave_iter : slaves) {
std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second;
std::lock_guard l(slave_ptr->slave_mu);
if (slave_ptr->sent_offset == slave_ptr->acked_offset) {
Status s = ReadBinlogFileToWq(slave_ptr);
if (!s.ok()) {
to_del.push_back(slave_ptr);
LOG(WARNING) << "WakeUpSlaveBinlogSync falied, Delete from RM, slave: " << slave_ptr->ToStringStatus() << " "
<< s.ToString();
}
}
}
for (auto& to_del_slave : to_del) {
RemoveSlaveNode(to_del_slave->Ip(), to_del_slave->Port());
}
return Status::OK();
}

Binlog 的过期策略

src/pika_server.cc

在 Pika 的定时器任务中,有个 AutoPurge 会对 Binlog 做定期的清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void PikaServer::DoTimingTask() {
// Maybe schedule compactrange
AutoCompactRange();
// Purge log
AutoPurge();
// Delete expired dump
AutoDeleteExpiredDump();
// Cheek Rsync Status
// TODO: temporarily disable rsync
// AutoKeepAliveRSync();
// Reset server qps
ResetLastSecQuerynum();
// Auto update network instantaneous metric
AutoUpdateNetworkMetric();
ProcessCronTask();
UpdateCacheInfo();
// Print the queue status periodically
PrintThreadPoolQueueStatus();

}

void PikaServer::AutoPurge() { DoSameThingEverySlot(TaskType::kPurgeLog); }

src/pika_server.cc

我们会调用 PurgeStableLogsBinlog 删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Status PikaServer::DoSameThingEverySlot(const TaskType& type) {
std::shared_lock rwl(dbs_rw_);
std::shared_ptr<SyncSlaveSlot> slave_slot = nullptr;
for (const auto& db_item : dbs_) {
for (const auto& slot_item : db_item.second->slots_) {
switch (type) {
case TaskType::kResetReplState: {
slave_slot =
g_pika_rm->GetSyncSlaveSlotByName(SlotInfo(db_item.second->GetDBName(), slot_item.second->GetSlotID()));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << db_item.second->GetDBName() << ":" << slot_item.second->GetSlotID()
<< " Not Found";
}
slave_slot->SetReplState(ReplState::kNoConnect);
break;
}
case TaskType::kPurgeLog: {
std::shared_ptr<SyncMasterSlot> slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_item.second->GetDBName(), slot_item.second->GetSlotID()));
if (!slot) {
LOG(WARNING) << "Slot: " << db_item.second->GetDBName() << ":" << slot_item.second->GetSlotID()
<< " Not Found.";
break;
}
slot->StableLogger()->PurgeStableLogs();
break;
}
case TaskType::kCompactAll:
slot_item.second->Compact(storage::kAll);
break;
default:
break;
}
}
}
return Status::OK();
}

src/pika_stable_log.cc

这里可以看到清理 DoPurgeStableLogs 是异步线程做处理,执行的时候调用 DoPurgeStableLogs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool StableLog::PurgeStableLogs(uint32_t to, bool manual) {
// Only one thread can go through
bool expect = false;
if (!purging_.compare_exchange_strong(expect, true)) {
LOG(WARNING) << "purge process already exist";
return false;
}
auto arg = new PurgeStableLogArg();
arg->to = to;
arg->manual = manual;
arg->logger = shared_from_this();
g_pika_server->PurgelogsTaskSchedule(&DoPurgeStableLogs, static_cast<void*>(arg));
return true;
}

src/pika_stable_log.cc

调用 PurgeFiles

1
2
3
4
5
void StableLog::DoPurgeStableLogs(void* arg) {
std::unique_ptr<PurgeStableLogArg> purge_arg(static_cast<PurgeStableLogArg*>(arg));
purge_arg->logger->PurgeFiles(purge_arg->to, purge_arg->manual);
purge_arg->logger->ClearPurge();
}

src/pika_stable_log.cc

这里我们先用一个 mapbinlogs 去获取当前目录下所有的 Binlog 文件名,然后与配置文件中规定的最大 Binlog 数量进行比对(最大是10),然后从最老(就是 filenum 最小的)的 Binlog 文件开始删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
bool StableLog::PurgeFiles(uint32_t to, bool manual) {
std::map<uint32_t, std::string> binlogs;
if (!GetBinlogFiles(&binlogs)) {
LOG(WARNING) << log_path_ << " Could not get binlog files!";
return false;
}

int delete_num = 0;
struct stat file_stat;
auto remain_expire_num = static_cast<int32_t>(binlogs.size() - g_pika_conf->expire_logs_nums());
std::shared_ptr<SyncMasterSlot> master_slot = nullptr;
std::map<uint32_t, std::string>::iterator it;
for (it = binlogs.begin(); it != binlogs.end(); ++it) {
if ((manual && it->first <= to) // Manual purgelogsto
|| (remain_expire_num > 0) // Expire num trigger
|| (binlogs.size() - delete_num > 10 // At lease remain 10 files
&& stat(((log_path_ + it->second)).c_str(), &file_stat) == 0 &&
file_stat.st_mtime < time(nullptr) - g_pika_conf->expire_logs_days() * 24 * 3600)) { // Expire time trigger
// We check this every time to avoid lock when we do file deletion
master_slot = g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name_, slot_id_));
...
if (!master_slot->BinlogCloudPurge(it->first)) {
LOG(WARNING) << log_path_ << " Could not purge " << (it->first) << ", since it is already be used";
return false;
}
// Do delete
if (pstd::DeleteFile(log_path_ + it->second)) {
++delete_num;
--remain_expire_num;
} else {
LOG(WARNING) << log_path_ << " Purge log file : " << (it->second) << " failed! error: delete file failed";
}
} else {
// Break when face the first one not satisfied
// Since the binlogs is order by the file index
break;
}
}
...
return true;
}