Pika的全量同步源码剖析

背景

本篇介绍一下 Pika 的全量同步,在讲解代码之前我先介绍一下主从同步中涉及到的几种状态(状态机)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// role 角色
PIKA_ROLE_SINGLE = 0 (初始节点)
PIKA_ROLE_SLAVE = 1 (从节点)
PIKA_ROLE_MASTER = 2 (主节点)

// repl_state_
PIKA_REPL_NO_CONNECT = 0 (未连接状态)
PIKA_REPL_SHOULD_META_SYNC = 1 (Meta_Sync状态)
PIKA_REPL_META_SYNC_DONE = 2 (Meta_Sync_Done状态)
PIKA_REPL_ERROR = 3 (Error状态)

enum ReplState {
kNoConnect = 0,
kTryConnect = 1,
kTryDBSync = 2,
kWaitDBSync = 3,
kWaitReply = 4,
kConnected = 5,
kError = 6,
kDBNoConnect = 7
}

1. 从命令开始

**src/pika_admin.cc **

我们调用 slaveof ip port 会进行主从同步操作,首先会调用 RemoveMaster 移除当前节点的主节点,,然后调用 SetMaster 设置新的 Master 节点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void SlaveofCmd::Do(std::shared_ptr<Slot> slot) {
...
g_pika_server->RemoveMaster();
...

bool sm_ret = g_pika_server->SetMaster(master_ip_, static_cast<int32_t>(master_port_));

if (sm_ret) {
res_.SetRes(CmdRes::kOk);
g_pika_server->ClearCacheDbAsync(slot);
g_pika_conf->SetSlaveof(master_ip_ + ":" + std::to_string(master_port_));
g_pika_conf->SetMasterRunID("");
g_pika_server->SetFirstMetaSync(true);
} else {
res_.SetRes(CmdRes::kErrOther, "Server is not in correct state for slaveof");
}
}

src/pika_server.cc

在这里将 repl_state 状态置为 PIKA_REPL_NO_CONNECTmaster_ip , master_port 都进行了初始化,如果之前这个节点有主节点的话调用 CloseReplClientConnLostConnection 分别关闭 PikaReplClient 连接和 RsyncClient 连接,最后调用 DoSameThingEvertSlot 将所有的 syncslaveslot 的状态置为 kNoConnect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void PikaServer::RemoveMaster() {
{
std::lock_guard l(state_protector_);
repl_state_ = PIKA_REPL_NO_CONNECT;
role_ &= ~PIKA_ROLE_SLAVE;

if (!master_ip_.empty() && master_port_ != -1) {
g_pika_rm->CloseReplClientConn(master_ip_, master_port_ + kPortShiftReplServer);
g_pika_rm->LostConnection(master_ip_, master_port_);
...
}

master_ip_ = "";
master_port_ = -1;
DoSameThingEverySlot(TaskType::kResetReplState);
}
}

src/pika_rm.cc

关闭 RepClientConn 连接

1
2
3
Status PikaReplicaManager::CloseReplClientConn(const std::string& ip, int32_t port) {
return pika_repl_client_->Close(ip, port);
}

src/pika_rm.cc

这里将 sync_master_slots 中的 slaves_ 移除,将 sync_slave_slots 中的 rsync_cli_ 停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status PikaReplicaManager::LostConnection(const std::string& ip, int port) {
std::shared_lock l(slots_rw_);
for (auto& iter : sync_master_slots_) {
std::shared_ptr<SyncMasterSlot> slot = iter.second;
Status s = slot->RemoveSlaveNode(ip, port);
if (!s.ok() && !s.IsNotFound()) {
LOG(WARNING) << "Lost Connection failed " << s.ToString();
}
}

for (auto& iter : sync_slave_slots_) {
std::shared_ptr<SyncSlaveSlot> slot = iter.second;
if (slot->MasterIp() == ip && slot->MasterPort() == port) {
slot->Deactivate();
}
}
return Status::OK();
}

FAQ

  • 为什么这里 role 的状态转移用的是 role_ &= ~PIKA_ROLE_SLAVE; 不能直接用 role_ = PIKA_ROLE_SLAVE

src/pika_admin.cc

这里将 repl_state_ 设置为 PIKA_REPL_SHOULD_META_SYNC 标识

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool PikaServer::SetMaster(std::string& master_ip, int master_port) {
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
return true;
}
return false;
}

FAQ

  • PikaReplClient 连接和 RsyncClient 连接有什么区别

    PikaReplClient 是端口号 + 2000 偏移量,RsyncClient 是端口号 + 1000 偏移量, CloseReplClientConn 关闭了 ReplClient 连接,LostConnection 关闭了 Rsync 连接

  • 为什么要用 ((role_ ^ PIKA_ROLE_SLAVE) != 0) 不能直接用 role_ == PIKA_ROLE_SINGLE, role |= PIKA_ROLE_SLAVE 为什么不能用 role = PIKA_ROLE_SLAVE

小结

slaveof 命令中将 repl_state_ 设置为 PIKA_REPL_SHOULD_META_SYNC ,每个 syncslaveslot 状态置为了 kNoConnect, role_ 设置成了 PIKA_ROLE_SLAVE , first_meta_sync_ 设置成了 true

2. MetaSync

src/pika_auxiliary_thread.cc

在刚刚的 slaveof命令执行完毕时,repl_state_ 被置为了 PIKA_REPL_SHOULD_META_SYNC 了,所以这里经过了 ShouldMetaSync 判断后会调用 SendMetaSyncRequest 发起 MetaSync 请求

1
2
3
4
5
6
7
8
9
void* PikaAuxiliaryThread::ThreadMain() {
while (!should_stop()) {
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
g_pika_rm->RunSyncSlaveSlotStateMachine();
}
...
}

src/pika_server.cc

判断 repl_state_ 是不是需要 META_SYNC 状态

1
2
3
4
bool PikaServer::ShouldMetaSync() {
std::shared_lock l(state_protector_);
return repl_state_ == PIKA_REPL_SHOULD_META_SYNC;
}

src/pika_rm.cc

这里调用 SendMetaSync,同时在请求发送成功后,更新 MetaSyncTimestamp 以及将 FirstsetMetaSync 置为 false .

1
2
3
4
5
6
7
8
9
10
11
12
Status PikaReplicaManager::SendMetaSyncRequest() {
Status s;
if (time(nullptr) - g_pika_server->GetMetaSyncTimestamp() >= PIKA_META_SYNC_MAX_WAIT_TIME ||
g_pika_server->IsFirstMetaSync()) {
s = pika_repl_client_->SendMetaSync();
if (s.ok()) {
g_pika_server->UpdateMetaSyncTimestamp();
g_pika_server->SetFirstMetaSync(false);
}
}
return s;
}

FAQ

  • time(nullptr) - g_pika_server->GetMetaSyncTimestamp() >= PIKA_META_SYNC_MAX_WAIT_TIME 为什么需要这一层判断

src/pika_repl_client.cc

这里可以看到主从全量复制的时候,都是从发起的连接,而主复用这条连接给从返回,这里先新建一个连接尝试连远端的 master-ip看是否能连接成功,并获取到 local-ip, 使用 InnerMessagetype 置为 kMetaSync 然后传入 ip, port 随后发送给 Master,发送给 Master 目标端口 + 2000 的端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status PikaReplClient::SendMetaSync() {
std::string local_ip;
std::unique_ptr<net::NetCli> cli (net::NewRedisCli());
cli->set_connect_timeout(1500);
if ((cli->Connect(g_pika_server->master_ip(), g_pika_server->master_port(), "")).ok()) {
struct sockaddr_in laddr;
socklen_t llen = sizeof(laddr);
getsockname(cli->fd(), reinterpret_cast<struct sockaddr*>(&laddr), &llen);
std::string tmp_local_ip(inet_ntoa(laddr.sin_addr));
local_ip = tmp_local_ip;
cli->Close();
} else {
LOG(WARNING) << "Failed to connect master, Master (" << g_pika_server->master_ip() << ":"
<< g_pika_server->master_port() << "), try reconnect";
// Sleep three seconds to avoid frequent try Meta Sync
// when the connection fails
sleep(3);
g_pika_server->ResetMetaSyncStatus();
return Status::Corruption("Connect master error");
}

InnerMessage::InnerRequest request;
request.set_type(InnerMessage::kMetaSync);
InnerMessage::InnerRequest::MetaSync* meta_sync = request.mutable_meta_sync();
InnerMessage::Node* node = meta_sync->mutable_node();
node->set_ip(local_ip);
node->set_port(g_pika_server->port());
...
std::string to_send;
std::string master_ip = g_pika_server->master_ip();
int master_port = g_pika_server->master_port();
if (!request.SerializeToString(&to_send)) {
LOG(WARNING) << "Serialize Meta Sync Request Failed, to Master (" << master_ip << ":" << master_port << ")";
return Status::Corruption("Serialize Failed");
}

LOG(INFO) << "Try Send Meta Sync Request to Master (" << master_ip << ":" << master_port << ")";
return client_thread_->Write(master_ip, master_port + kPortShiftReplServer, to_send);
}

小结

MetaSync 请求中从节点给主节点发送了自己的 ip, port, masterauth , 发送的端口号是主节点端口 + 2000

FAQ

  • 为什么连接失败要调用 ResetMetaSyncStatus 重置状态

  • 怎么判断主从复制的连接是长连接,如果是的话是不是 fd 就不会改变了

  • 为什么要获取 local-ip

src/pika_repl_server_conn.cc

Master 节点拿到 InnerMessage 后,对比一下 masterauth ,然后调用 TryAddSlavePika_server 中的 slaves_中添加 slave 节点信息,然后调用 ReplServerUpdateClientConnMap Pika_server 中的 client_conn_map 中添加节点信息,调用 BecomeMaster 把自己的角色从 PIKA_ROLE_SINGLE 变为 PIKA_ROLE_MASTER ,将返回码状态置为 kOK ,这时候主节点产生 ReplicationID, 然后在 InnerMessage 中记录了 classic_mode , run_id , replication_id, 以及 db_struct 写回给从节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void PikaReplServerConn::HandleMetaSyncRequest(void* arg) {
...
LOG(INFO) << "Receive MetaSync, Slave ip: " << node.ip() << ", Slave port:" << node.port();
std::vector<DBStruct> db_structs = g_pika_conf->db_structs();
bool success = g_pika_server->TryAddSlave(node.ip(), node.port(), conn->fd(), db_structs);
const std::string ip_port = pstd::IpPortString(node.ip(), node.port());
g_pika_rm->ReplServerUpdateClientConnMap(ip_port, conn->fd());
if (!success) {
response.set_code(InnerMessage::kOther);
response.set_reply("Slave AlreadyExist");
} else {
g_pika_server->BecomeMaster();
response.set_code(InnerMessage::kOk);
InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync();
if (g_pika_conf->replication_id() == "") {
std::string replication_id = pstd::getRandomHexChars(configReplicationIDSize);
g_pika_conf->SetReplicationID(replication_id);
g_pika_conf->ConfigRewriteReplicationID();
}
meta_sync->set_classic_mode(g_pika_conf->classic_mode());
meta_sync->set_run_id(g_pika_conf->run_id());
meta_sync->set_replication_id(g_pika_conf->replication_id());
for (const auto& db_struct : db_structs) {
InnerMessage::InnerResponse_MetaSync_DBInfo* db_info = meta_sync->add_dbs_info();
db_info->set_db_name(db_struct.db_name);
db_info->set_slot_num(static_cast<int32_t>(db_struct.slot_num));
}
}
...

std::string reply_str;
if (!response.SerializeToString(&reply_str) || (conn->WriteResp(reply_str) != 0)) {
LOG(WARNING) << "Process MetaSync request serialization failed";
conn->NotifyClose();
return;
}
conn->NotifyWrite();
}

MetaSync 总结

MetaSync 请求中,主节点给从节点回了 classic_mode , run_id, replication_id,db_name, slot_num 这些信息,用于从节点去对比自己的 DB 结构以及设置 replication_id.

截屏2024-01-25 10 10 52

src/pika_repl_client_conn.cc

从节点拿到 Master 的 MetaSyncResponse,先解析状态码,如果是 kOther 则重新进行 MetaSync,如果不是 kOK 则报错,将 repl_state_ 置为 PIKA_REPL_ERROR, 如果顺利的话就从 Master 节点返回的 DBStruct 去对比自己的 DBStruct (这里对比的是 DBName, slot_sum, slot_id 这些值),如果不一致则将 repl_state_ 置为 PIKA_REPL_ERROR 代表不能做主从同步, 如果和 Master 的 DB 结构一样的话,去查看 Master 返回的 ReplicationID , 如果传过来为空则等待下次请求,如果不为空,判断从节点自己原来有没有 ReplicationID ,如果没有,则将主节点的设置成从节点自己的 ReplicationID,同时将 force_fulll_sync 置为 true代表进行强制全量同步 ,然后调用 PrepareSlotTrySync, 以及调用 FinishMetaSyncrepl_state_ 状态置为 PIKA_REPL_META_SYNC_DONE 代表 MetaSync 流程完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
...

const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync();

std::vector<DBStruct> master_db_structs;
for (int idx = 0; idx < meta_sync.dbs_info_size(); ++idx) {
const InnerMessage::InnerResponse_MetaSync_DBInfo& db_info = meta_sync.dbs_info(idx);
master_db_structs.push_back({db_info.db_name(), static_cast<uint32_t>(db_info.slot_num()), {0}});
}

std::vector<DBStruct> self_db_structs = g_pika_conf->db_structs();
if (!PikaReplClientConn::IsDBStructConsistent(self_db_structs, master_db_structs)) {
...
g_pika_server->SyncError();
conn->NotifyClose();
return;
}

// The relicationid obtained from the server is null
if (meta_sync.replication_id() == "") {
LOG(WARNING) << "Meta Sync Failed: the relicationid obtained from the server is null, keep sending MetaSync msg";
return;
}

// The Replicationids of both the primary and secondary Replicationid are not empty and are not equal
if (g_pika_conf->replication_id() != meta_sync.replication_id() && g_pika_conf->replication_id() != "") {
LOG(WARNING) << "Meta Sync Failed: replicationid on both sides of the connection are inconsistent";
g_pika_server->SyncError();
conn->NotifyClose();
return;
}

// First synchronization between the master and slave
if (g_pika_conf->replication_id() != meta_sync.replication_id()) {
...
g_pika_server->force_full_sync_ = true;
g_pika_conf->SetReplicationID(meta_sync.replication_id());
g_pika_conf->ConfigRewriteReplicationID();
}

g_pika_conf->SetWriteBinlog("yes");
g_pika_server->PrepareSlotTrySync();
g_pika_server->FinishMetaSync();
LOG(INFO) << "Finish to handle meta sync response";
}

FAQ

  • 主节点传递过来的 run-id 好像没有用到,是否可以进行删除

src/pika_server.cc

PrepardSlotTrySync 中判断 force_full_sync_ 是不是要进行强制全量同步, 如果是的话将 state 置为 kTryDBSync 否则置为 kTryConnect, 遍历整个 DB 结构每一个 slot 去调用 ActicateSyncSlaveSlot 去和 Master 节点做连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void PikaServer::PrepareSlotTrySync() {
std::shared_lock rwl(dbs_rw_);
ReplState state = force_full_sync_ ? ReplState::kTryDBSync : ReplState::kTryConnect;
for (const auto& db_item : dbs_) {
for (const auto& slot_item : db_item.second->slots_) {
Status s = g_pika_rm->ActivateSyncSlaveSlot(
RmNode(g_pika_server->master_ip(), g_pika_server->master_port(), db_item.second->GetDBName(),
slot_item.second->GetSlotID()),
state);
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
}
}
force_full_sync_ = false;
LOG(INFO) << "Mark try connect finish";
}

src/pika_rm.cc

SelectLocalIP 让每个 slot 和远端的 Master 都尝试建立一条连接,然后将每个节点的 state 置为 kTryDBSync ,这里的 sync_slave_slots 保存了每个 slot 的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status PikaReplicaManager::ActivateSyncSlaveSlot(const RmNode& node, const ReplState& repl_state) {
std::shared_lock l(slots_rw_);
const SlotInfo& p_info = node.NodeSlotInfo();
if (sync_slave_slots_.find(p_info) == sync_slave_slots_.end()) {
return Status::NotFound("Sync Slave Slot " + node.ToString() + " not found");
}
ReplState ssp_state = sync_slave_slots_[p_info]->State();
if (ssp_state != ReplState::kNoConnect && ssp_state != ReplState::kDBNoConnect) {
return Status::Corruption("Sync Slave Slot in " + ReplStateMsg[ssp_state]);
}
std::string local_ip;
Status s = SelectLocalIp(node.Ip(), node.Port(), &local_ip);
if (s.ok()) {
sync_slave_slots_[p_info]->SetLocalIp(local_ip);
sync_slave_slots_[p_info]->Activate(node, repl_state);
}
return s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SyncSlaveSlot {
std::unique_ptr<rsync::RsyncClient> rsync_cli_;
pstd::Mutex slot_mu_;
RmNode m_info_;
ReplState repl_state_{kNoConnect};
std::string local_ip_;
}

class SyncMasterSlot {
int32_t session_id_ = 0;
ConsensusCoordinator coordinator_;
}

class ConsensusCoordinator {
LogOffset committed_index_;
std::shared_ptr<Context> context_;
std::shared_mutex term_rwlock_;
uint32_t term_ = 0;
std::string db_name_;
uint32_t slot_id_ = 0;
SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
std::shared_ptr<MemLog> mem_logger_;
}

DBSync

src/pika_auxiliary_thread.cc

重新回到辅助线程,由于刚刚把 repl_state_ 置为了 PIKA_REPL_META_SYNC_DONE ,并且每个 syncslaveslot 的状态变成了kTryDBSync,所以我们这里我们经过了 MetaSyncDone 判断之后去执行 RunSyncSlaveSlotStateMachine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void* PikaAuxiliaryThread::ThreadMain() {
while (!should_stop()) {
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
g_pika_rm->RunSyncSlaveSlotStateMachine();
}

pstd::Status s = g_pika_rm->CheckSyncTimeout(pstd::NowMicros());
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}

g_pika_server->CheckLeaderProtectedMode();

// TODO(whoiami) timeout
s = g_pika_server->TriggerSendBinlogSync();
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
// send to peer
int res = g_pika_server->SendToPeer();
if (res == 0) {
// sleep 100 ms
std::unique_lock lock(mu_);
cv_.wait_for(lock, 100ms);
} else {
// LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
}
}
return nullptr;
}

src/pika_rm.cc

这个用了一个 for 循环去遍历 sync_slave_slots 中每个 slot 的状态,在刚刚的 MetaSync 处理请求结果中,我们将每个 slot 的状态设置成了 kTryDBSync 了,所以这里我们调用的是 SendSlotDBSyncRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status PikaReplicaManager::RunSyncSlaveSlotStateMachine() {
std::shared_lock l(slots_rw_);
for (const auto& item : sync_slave_slots_) {
SlotInfo p_info = item.first;
std::shared_ptr<SyncSlaveSlot> s_slot = item.second;
if (s_slot->State() == ReplState::kTryConnect) {
SendSlotTrySyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kTryDBSync) {
SendSlotDBSyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kWaitReply) {
continue;
} else if (s_slot->State() == ReplState::kWaitDBSync) {
s_slot->ActivateRsync();
std::shared_ptr<Slot> slot =
g_pika_server->GetDBSlotById(p_info.db_name_, p_info.slot_id_);
if (slot) {
if (!s_slot->IsRsyncRunning()) {
slot->TryUpdateMasterOffset();
}
} else {
LOG(WARNING) << "Slot not found, DB Name: " << p_info.db_name_
<< " Slot Id: " << p_info.slot_id_;
}
} else if (s_slot->State() == ReplState::kConnected || s_slot->State() == ReplState::kNoConnect ||
s_slot->State() == ReplState::kDBNoConnect) {
continue;
}
}
return Status::OK();
}

src/pika_rm.cc

这里调用 GetDBSlotBinlogOffset 将从节点自身的 Binlogfilenumoffset 记录在 boffset 中,调用 PrepareRsync ,在 dbsync_path_ 下先删除了之前的文件夹然后重新建立五种数据类型的文件夹. 然后调用了 SendSlotDBSync ,如果成功的话将 repl_state_ 的状态设置为 kWaitReply 否则置为 kError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Status PikaReplicaManager::SendSlotDBSyncRequest(const std::string& db_name, size_t slot_id) {
BinlogOffset boffset;
if (!g_pika_server->GetDBSlotBinlogOffset(db_name, slot_id, &boffset)) {
LOG(WARNING) << "Slot: " << db_name << ":" << slot_id << ", Get slot binlog offset failed";
return Status::Corruption("Slot get binlog offset error");
}

std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById(db_name, slot_id);
if (!slot) {
LOG(WARNING) << "Slot: " << db_name << ":" << slot_id << ", NotFound";
return Status::Corruption("Slot not found");
}
slot->PrepareRsync();

std::shared_ptr<SyncSlaveSlot> slave_slot =
GetSyncSlaveSlotByName(SlotInfo(db_name, slot_id));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << db_name << ":" << slot_id << ", NotFound";
return Status::Corruption("Slave Slot not found");
}

Status status = pika_repl_client_->SendSlotDBSync(slave_slot->MasterIp(), slave_slot->MasterPort(),
db_name, slot_id, boffset, slave_slot->LocalIp());

if (status.ok()) {
slave_slot->SetReplState(ReplState::kWaitReply);
} else {
slave_slot->SetReplState(ReplState::kError);
LOG(WARNING) << "SendSlotDBSync failed " << status.ToString();
}
return status;
}

FAQ

  • 这里如果发送 DBSync 请求失败了为什么仅仅只是将 repl 状态转为 kError, 而没有其他的措施去解决

  • 这里的 status的返回结果是不是只是 DbSync 是否成功发送出去了的返回,而不是 DbSync 的 Response 返回结果?

src/pika_repl_client.cc

这里将 InnerMessage 类型设置为 kDBSync 然后将 ip-port, db_name , slot_idfilenumoffset 传进去,发送给 Master 节点.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Status PikaReplClient::SendSlotDBSync(const std::string& ip, uint32_t port, const std::string& db_name,
uint32_t slot_id, const BinlogOffset& boffset,
const std::string& local_ip) {
InnerMessage::InnerRequest request;
request.set_type(InnerMessage::kDBSync);
InnerMessage::InnerRequest::DBSync* db_sync = request.mutable_db_sync();
InnerMessage::Node* node = db_sync->mutable_node();
node->set_ip(local_ip);
node->set_port(g_pika_server->port());
InnerMessage::Slot* slot = db_sync->mutable_slot();
slot->set_db_name(db_name);
slot->set_slot_id(slot_id);

InnerMessage::BinlogOffset* binlog_offset = db_sync->mutable_binlog_offset();
binlog_offset->set_filenum(boffset.filenum);
binlog_offset->set_offset(boffset.offset);

std::string to_send;
if (!request.SerializeToString(&to_send)) {
LOG(WARNING) << "Serialize Slot DBSync Request Failed, to Master (" << ip << ":" << port << ")";
return Status::Corruption("Serialize Failed");
}
return client_thread_->Write(ip, static_cast<int32_t>(port) + kPortShiftReplServer, to_send);
}

小结

从节点发送的 DBSync 请求中,发送了 ip , port , db_name , slot_id, filenum, offset

src/pika_repl_server_conn.cc

这里主节点将从节点信息写到 syncmasterslot 中的 slave_中以及 Pika_repl_server 中的 client_conn_map 中,然后调用 DoBgSaveSlot 执行 Bgsave 操作,注意这里调用的端口是目标端口 + 1000,最后调用 ActivateSlaveDbSync

1
const int kPortShiftRSync = 1000;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void PikaReplServerConn::HandleDBSyncRequest(void* arg) {
...
bool prior_success = true;
std::shared_ptr<SyncMasterSlot> master_slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
if (!master_slot) {
...
prior_success = false;
}
if (prior_success) {
if (!master_slot->CheckSlaveNodeExist(node.ip(), node.port())) {
int32_t session_id = master_slot->GenSessionId();
if (session_id == -1) {
response.set_code(InnerMessage::kError);
...
prior_success = false;
}
if (prior_success) {
db_sync_response->set_session_id(session_id);
Status s = master_slot->AddSlaveNode(node.ip(), node.port(), session_id);
if (s.ok()) {
const std::string ip_port = pstd::IpPortString(node.ip(), node.port());
g_pika_rm->ReplServerUpdateClientConnMap(ip_port, conn->fd());
...
} else {
db_sync_response->set_session_id(-1);
}
} else {
int32_t session_id;
Status s = master_slot->GetSlaveNodeSession(node.ip(), node.port(), &session_id);
if (!s.ok()) {
response.set_code(InnerMessage::kError);
...
db_sync_response->set_session_id(-1);
...
}
}

g_pika_server->DoBgSaveSlot(node.ip(), node.port() + kPortShiftRSync, db_name, slot_id,
static_cast<int32_t>(slave_boffset.filenum()));
...
master_slot->ActivateSlaveDbSync(node.ip(), node.port());

if (!response.SerializeToString(&reply_str) || (conn->WriteResp(reply_str) != 0)) {
...
conn->NotifyClose();
return;
}
conn->NotifyWrite();
}

src/pika_server.cc

根据 bgsave_info 的信息判断是否需要重新进行 bgsave 产生 dump 文件, 当 dump 文件夹不存在或者上次进行 bgsave 时的 binlog 已经不存在或者从节点的binlog_filenum 已经大于自己且超过阈值(50),那么需要重新进行 bgsave,否则使用原来的 dump 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void PikaServer::DoBgSaveSlot(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top) {
std::shared_ptr<Slot> slot = GetDBSlotById(db_name, slot_id);
if (!slot) {
LOG(WARNING) << "can not find Slot whose id is " << slot_id << " in db " << db_name << ", DoBgSaveSlot Failed";
return;
}
std::shared_ptr<SyncMasterSlot> sync_slot = g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
if (!sync_slot) {
LOG(WARNING) << "can not find Slot whose id is " << slot_id << " in db " << db_name << ", DoBgSaveSlot Failed";
return;
}
BgSaveInfo bgsave_info = slot->bgsave_info();
std::string logger_filename = sync_slot->Logger()->filename();
if (pstd::IsDir(bgsave_info.path) != 0 ||
!pstd::FileExists(NewFileName(logger_filename, bgsave_info.offset.b_offset.filenum)) ||
top - bgsave_info.offset.b_offset.filenum > kDBSyncMaxGap) {
// Need Bgsave first
slot->BgSaveSlot();
}
}

src/pika_rm.cc

ActivateSlaveDbsync 中将 Slave 节点的状态置为 kSlaveDbSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status SyncMasterSlot::ActivateSlaveDbSync(const std::string& ip, int port) {
std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode(ip, port);
if (!slave_ptr) {
return Status::NotFound("ip " + ip + " port " + std::to_string(port));
}

slave_ptr->Lock();
slave_ptr->slave_state = kSlaveDbSync;
// invoke db sync
slave_ptr->Unlock();

return Status::OK();
}

// rm define
enum SlaveState {
kSlaveNotSync = 0,
kSlaveDbSync = 1,
kSlaveBinlogSync = 2,
};

FAQ

  • 为什么 top - bgsave_info.offset.b_offset.filenum > kDBSyncMaxGap 当从节点的 filenum 比主节点的之前 bgsavefilenum相减大于 50 这个条件要进行 bgsave 操作

  • 这个 ActivateSlaveDbSync 函数有用吗

DBSync总结

DBSync 中主节点给从节点的回包信息中写了 session_id, db_name, slot_id,同时主节点根据判断做出是不是要进行 bgsave 操作

截屏2024-01-25 10 11 46

src/pika_repl_client_conn.cc

从节点收到 Master 节点的 DBSync 回包后,设置一下自己的 session_id, 然后停止 Rsync线程,将状态改为 kWaitDBSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
...
if (response->code() != InnerMessage::kOk) {
slave_slot->SetReplState(ReplState::kError);
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "DBSync Failed: " << reply;
return;
}

slave_slot->SetMasterSessionId(session_id);

std::string slot_name = slave_slot->SlotName();
slave_slot->StopRsync();
slave_slot->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "Slot: " << slot_name << " Need Wait To Sync";
}

FAQ

  • 为什么这里回包的状态不等于 kOK 不需要触发 SyncError

    不成功的状态可能是主节点 AddSlaveNode 操作失败,可以等待下次请求再次尝试

RsyncMeta

src/pika_rm.cc

再次回到这个 for 循环,由于我们现在是 kWaitDBSync 状态,所以我们调用 ActivateRsync启动了 Rsync 线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status PikaReplicaManager::RunSyncSlaveSlotStateMachine() {
std::shared_lock l(slots_rw_);
for (const auto& item : sync_slave_slots_) {
SlotInfo p_info = item.first;
std::shared_ptr<SyncSlaveSlot> s_slot = item.second;
if (s_slot->State() == ReplState::kTryConnect) {
SendSlotTrySyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kTryDBSync) {
SendSlotDBSyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kWaitReply) {
continue;
} else if (s_slot->State() == ReplState::kWaitDBSync) {
s_slot->ActivateRsync();
std::shared_ptr<Slot> slot =
g_pika_server->GetDBSlotById(p_info.db_name_, p_info.slot_id_);
if (slot) {
if (!s_slot->IsRsyncRunning()) {
slot->TryUpdateMasterOffset();
}
} else {
LOG(WARNING) << "Slot not found, DB Name: " << p_info.db_name_
<< " Slot Id: " << p_info.slot_id_;
}
} else if (s_slot->State() == ReplState::kConnected || s_slot->State() == ReplState::kNoConnect ||
s_slot->State() == ReplState::kDBNoConnect) {
continue;
}
}
return Status::OK();
}

src/pika_rm.cc

ActivateRsync 中先调用 Init()

1
2
3
4
5
6
7
8
9
void SyncSlaveSlot::ActivateRsync() {
if (!rsync_cli_->IsIdle()) {
return;
}
LOG(WARNING) << "ActivateRsync ...";
if (rsync_cli_->Init()) {
rsync_cli_->Start();
}
}

src/rsync_client.cc

这里使用 port + 偏移量(10001) 作为 master_port_ 这个 Rsync 的端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool RsyncClient::Init() {
if (state_ != IDLE) {
LOG(WARNING) << "State should be IDLE when Init";
return false;
}
master_ip_ = g_pika_server->master_ip();
master_port_ = g_pika_server->master_port() + kPortShiftRsync2;
file_set_.clear();
client_thread_->StartThread();
bool ret = Recover();
if (!ret) {
LOG(WARNING) << "RsyncClient recover failed";
client_thread_->StopThread();
return false;
}
finished_work_cnt_.store(0);
LOG(INFO) << "RsyncClient recover success";
return true;
}

FAQ

  • 这里的 client_thread 线程是什么

src/rsync_client.cc

Recover 中,我们先调用了 CopyRemoteMeta 函数用于拉取远端 dump 文件中的元信息,然后再根据本地的信息进行对比,去更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
bool RsyncClient::Recover() {
std::string local_snapshot_uuid;
std::string remote_snapshot_uuid;
std::set<std::string> local_file_set;
std::set<std::string> remote_file_set;
std::map<std::string, std::string> local_file_map;

Status s = CopyRemoteMeta(&remote_snapshot_uuid, &remote_file_set);
...
s = LoadLocalMeta(&local_snapshot_uuid, &local_file_map);
...
for (auto const& file : local_file_map) {
local_file_set.insert(file.first);
}

std::set<std::string> expired_files;
if (remote_snapshot_uuid != local_snapshot_uuid) {
snapshot_uuid_ = remote_snapshot_uuid;
file_set_ = remote_file_set;
expired_files = local_file_set;
} else {
std::set<std::string> newly_files;
set_difference(remote_file_set.begin(), remote_file_set.end(),
local_file_set.begin(), local_file_set.end(),
inserter(newly_files, newly_files.begin()));
set_difference(local_file_set.begin(), local_file_set.end(),
remote_file_set.begin(), remote_file_set.end(),
inserter(expired_files, expired_files.begin()));
file_set_.insert(newly_files.begin(), newly_files.end());
}

s = CleanUpExpiredFiles(local_snapshot_uuid != remote_snapshot_uuid, expired_files);
...
s = UpdateLocalMeta(snapshot_uuid_, expired_files, &local_file_map);
...

state_ = RUNNING;
...
return true;
}

FAQ

  • 为什么这里还需要对比一下本地的数据,直接覆盖不可以吗

src/rsync_client.cc

这里封装了一个 RsyncRequest 里面有 db_name , slot_id, kRsyncMeta, reader_index 然后发送给 Master,收到 resp 后将 Master 发给自己的 Dump 文件的元信息记录到 file_set 中,同时更新 snapshot_uuid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::string>* file_set) {
Status s;
int retries = 0;
RsyncRequest request;
request.set_reader_index(0);
request.set_db_name(db_name_);
request.set_slot_id(slot_id_);
request.set_type(kRsyncMeta);
std::string to_send;
request.SerializeToString(&to_send);
while (retries < max_retries_) {
WaitObject* wo = wo_mgr_->UpdateWaitObject(0, "", kRsyncMeta, kInvalidOffset);
s = client_thread_->Write(master_ip_, master_port_, to_send);
if (!s.ok()) {
retries++;
}
std::shared_ptr<RsyncResponse> resp;
s = wo->Wait(resp);
if (s.IsTimeout() || resp.get() == nullptr) {
LOG(WARNING) << "rsync CopyRemoteMeta request timeout, "
<< "retry times: " << retries;
retries++;
continue;
}

if (resp->code() != RsyncService::kOk) {
//TODO: handle different error
continue;
}
LOG(INFO) << "receive rsync meta infos, snapshot_uuid: " << resp->snapshot_uuid()
<< "files count: " << resp->meta_resp().filenames_size();
for (std::string item : resp->meta_resp().filenames()) {
file_set->insert(item);
}

*snapshot_uuid = resp->snapshot_uuid();
break;
}
return s;
}

小结

从节点发送的 RsyncMeta 请求中包含了 db_name, slot_id, reader_index

src/rsync_server.cc

Master 端处理 MetaRsync 请求,如果当前的 slot 正在做 bgsave 的话会直接返回,如果没有调用 GetDumpMetaDump 文件中的元信息记录到 filenames中,然后一并返回给 Slave.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void RsyncServerConn::HandleMetaRsyncRequest(void* arg) {
std::unique_ptr<RsyncServerTaskArg> task_arg(static_cast<RsyncServerTaskArg*>(arg));
const std::shared_ptr<RsyncService::RsyncRequest> req = task_arg->req;
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::string db_name = req->db_name();
uint32_t slot_id = req->slot_id();
std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById(db_name, slot_id);
if (!slot || slot->IsBgSaving()) {
LOG(WARNING) << "waiting bgsave done...";
return;
}

RsyncService::RsyncResponse response;
response.set_reader_index(req->reader_index());
response.set_code(RsyncService::kOk);
response.set_type(RsyncService::kRsyncMeta);
response.set_db_name(db_name);
response.set_slot_id(slot_id);

std::vector<std::string> filenames;
std::string snapshot_uuid;
g_pika_server->GetDumpMeta(db_name, slot_id, &filenames, &snapshot_uuid);
response.set_snapshot_uuid(snapshot_uuid);
...

RsyncService::MetaResponse* meta_resp = response.mutable_meta_resp();
for (const auto& filename : filenames) {
meta_resp->add_filenames(filename);
}
RsyncWriteResp(response, conn);
}

FAQ

  • 为什么这里主节点的回包中 reader_index 复用的是 request 传进来的 reader_index

RsyncMeta总结

MetaRsync 请求中,主节点给从节点的回包包括 db_name , slot_id , reader_index, snapshot_uuid, filename

截屏2024-01-25 10 12 17

RsyncFile

src/rsync_client.cc

file_set_ 里面保存着远端文件的元信息之后,work_threads 就开始调用 Copy 去远端拉取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void* RsyncClient::ThreadMain() {
...
std::vector<std::set<std::string> > file_vec(GetParallelNum());
int index = 0;
for (const auto& file : file_set_) {
file_vec[index++ % GetParallelNum()].insert(file);
}

for (int i = 0; i < GetParallelNum(); i++) {
work_threads_[i] = std::move(std::thread(&RsyncClient::Copy, this, file_vec[i], i));
}
...

while (state_.load() == RUNNING) {
uint64_t elapse = pstd::NowMicros() - start_time;
if (elapse < kFlushIntervalUs) {
int wait_for_us = kFlushIntervalUs - elapse;
std::unique_lock<std::mutex> lock(mu_);
cond_.wait_for(lock, std::chrono::microseconds(wait_for_us));
}

if (state_.load() != RUNNING) {
break;
}

start_time = pstd::NowMicros();
std::map<std::string, std::string> files_map;
{
std::lock_guard<std::mutex> guard(mu_);
files_map.swap(meta_table_);
}
for (const auto& file : files_map) {
meta_rep.append(file.first + ":" + file.second);
meta_rep.append("\n");
}
...

if (finished_work_cnt_.load() == GetParallelNum()) {
break;
}
}

for (int i = 0; i < GetParallelNum(); i++) {
work_threads_[i].join();
}
finished_work_cnt_.store(0);
state_.store(STOP);
...
}

FAQ

  • 这里的 COPY 函数的入参怎么确定

src/rsync_client.cc

调用 CopyRemoteFile 去远程拉取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void RsyncClient::Copy(const std::set<std::string>& file_set, int index) {
Status s = Status::OK();
for (const auto& file : file_set) {
while (state_.load() == RUNNING) {
LOG(INFO) << "copy remote file, filename: " << file;
s = CopyRemoteFile(file, index);
if (!s.ok()) {
LOG(WARNING) << "copy remote file failed, msg: " << s.ToString();
continue;
}
break;
}
if (state_.load() != RUNNING) {
break;
}
}
LOG(INFO) << "work_thread index: " << index << " copy remote files done";
finished_work_cnt_.fetch_add(1);
cond_.notify_all();
}

src/rsync_client.cc

这里封装了请求 RsyncRequest ,发给 Master 端 reader_index, db_name , slot_id , filename , offset, count, 同时指定了写文件的路径 dbsync, 当 Master 端回包的时候,调用 Writer 在路径下写文件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
const std::string filepath = dir_ + "/" + filename;
std::unique_ptr<RsyncWriter> writer(new RsyncWriter(filepath));
...
while (retries < max_retries_) {
if (state_.load() != RUNNING) {
break;
}
size_t copy_file_begin_time = pstd::NowMicros();
size_t count = Throttle::GetInstance().ThrottledByThroughput(kBytesPerRequest);
if (count == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000 / kThrottleCheckCycle));
continue;
}
RsyncRequest request;
request.set_reader_index(index);
request.set_type(kRsyncFile);
request.set_db_name(db_name_);
request.set_slot_id(slot_id_);
FileRequest* file_req = request.mutable_file_req();
file_req->set_filename(filename);
file_req->set_offset(offset);
file_req->set_count(count);

std::string to_send;
request.SerializeToString(&to_send);
WaitObject* wo = wo_mgr_->UpdateWaitObject(index, filename, kRsyncFile, offset);
s = client_thread_->Write(master_ip_, master_port_, to_send);
...
std::shared_ptr<RsyncResponse> resp = nullptr;
s = wo->Wait(resp);
...
size_t ret_count = resp->file_resp().count();
size_t elaspe_time_us = pstd::NowMicros() - copy_file_begin_time;
Throttle::GetInstance().ReturnUnusedThroughput(count, ret_count, elaspe_time_us);
...
s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str());
...
offset += resp->file_resp().count();
if (resp->file_resp().eof()) {
s = writer->Fsync();
...
break;
}
retries = 0;
}
...
}

小结

这里从节点发送的 RsyncFile 请求中包括 db_name, slot_id, reader_index, filename, offset, count

src/rsync_server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void RsyncServerConn::HandleFileRsyncRequest(void* arg) {
...
std::string snapshot_uuid;
Status s = g_pika_server->GetDumpUUID(db_name, slot_id, &snapshot_uuid);
response.set_snapshot_uuid(snapshot_uuid);
if (!s.ok()) {
LOG(WARNING) << "rsyncserver get snapshotUUID failed";
response.set_code(RsyncService::kErr);
RsyncWriteResp(response, conn);
return;
}

std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById(db_name, slot_id);
if (!slot) {
LOG(WARNING) << "cannot find slot for db_name: " << db_name
<< " slot_id: " << slot_id;
response.set_code(RsyncService::kErr);
RsyncWriteResp(response, conn);
}

const std::string filepath = slot->bgsave_info().path + "/" + filename;
char* buffer = new char[req->file_req().count() + 1];
size_t bytes_read{0};
std::string checksum = "";
bool is_eof = false;
std::shared_ptr<RsyncReader> reader = conn->readers_[req->reader_index()];
s = reader->Read(filepath, offset, count, buffer,
&bytes_read, &checksum, &is_eof);
if (!s.ok()) {
response.set_code(RsyncService::kErr);
RsyncWriteResp(response, conn);
delete []buffer;
return;
}

RsyncService::FileResponse* file_resp = response.mutable_file_resp();
file_resp->set_data(buffer, bytes_read);
file_resp->set_eof(is_eof);
file_resp->set_checksum(checksum);
file_resp->set_filename(filename);
file_resp->set_count(bytes_read);
file_resp->set_offset(offset);

RsyncWriteResp(response, conn);
delete []buffer;
}

FAQ

  • 这里为什么 snapshot 不能通过 Pb 传过去,而且复制完之后到从节点这边再去判断

  • s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str()); 这个是把数据写到哪里了

  • checksum 有什么用

RsyncFile 总结

这里 Master 的 FileRsync 请求给从节点值包括 db_name, slot_id, read_index, data, eof, checksum, filename, bytes_read, offset

截屏2024-01-25 10 13 12

TrySync

src/pika_rm.cc

在获取到所有的远端文件之后,Rsync 线程关闭,调用 TryUpdateMasterOffset 更新 offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status PikaReplicaManager::RunSyncSlaveSlotStateMachine() {
std::shared_lock l(slots_rw_);
for (const auto& item : sync_slave_slots_) {
SlotInfo p_info = item.first;
std::shared_ptr<SyncSlaveSlot> s_slot = item.second;
if (s_slot->State() == ReplState::kTryConnect) {
SendSlotTrySyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kTryDBSync) {
SendSlotDBSyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kWaitReply) {
continue;
} else if (s_slot->State() == ReplState::kWaitDBSync) {
s_slot->ActivateRsync();
std::shared_ptr<Slot> slot =
g_pika_server->GetDBSlotById(p_info.db_name_, p_info.slot_id_);
if (slot) {
if (!s_slot->IsRsyncRunning()) {
slot->TryUpdateMasterOffset();
}
} else {
LOG(WARNING) << "Slot not found, DB Name: " << p_info.db_name_
<< " Slot Id: " << p_info.slot_id_;
}
} else if (s_slot->State() == ReplState::kConnected || s_slot->State() == ReplState::kNoConnect ||
s_slot->State() == ReplState::kDBNoConnect) {
continue;
}
}
return Status::OK();
}

src/pika_slot.cc

这里会更新 binlog offset 的偏移量,我们会去去读取 info 文件,然后把 info 文件中的 master_port, filenum, offset, term , index 更新到从节点上,然后对 info 文件进行删除,这里调用 ChaneDbdbsync 文件替换 db 文件夹,然后删除旧的 db 文件夹,然后将 slot 状态设置成 kTryConnect. 也就是说不管之前从节点的 binlog 写到哪里了,现在的从节点的偏移量和文件名是根据主节点传给他的偏移量和文件名的地方开始写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool Slot::TryUpdateMasterOffset() {
std::string info_path = dbsync_path_ + kBgsaveInfoFile;
...

// Got new binlog offset
std::ifstream is(info_path);
...
std::string line;
std::string master_ip;
...
is.close();
...

pstd::DeleteFile(info_path);
if (!ChangeDb(dbsync_path_)) {
LOG(WARNING) << "Slot: " << slot_name_ << ", Failed to change db";
slave_slot->SetReplState(ReplState::kError);
return false;
}

// Update master offset
std::shared_ptr<SyncMasterSlot> master_slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name_, slot_id_));
if (!master_slot) {
LOG(WARNING) << "Master Slot: " << slot_name_ << " not exist";
return false;
}
master_slot->Logger()->SetProducerStatus(filenum, offset);
slave_slot->SetReplState(ReplState::kTryConnect);
return true;
}

src/pika_rm.cc

由于 slot 状态变更成了 kTryConnect, 这里调用 SendSlotTrySyncRequest 准备进行增量同步判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status PikaReplicaManager::RunSyncSlaveSlotStateMachine() {
std::shared_lock l(slots_rw_);
for (const auto& item : sync_slave_slots_) {
SlotInfo p_info = item.first;
std::shared_ptr<SyncSlaveSlot> s_slot = item.second;
if (s_slot->State() == ReplState::kTryConnect) {
SendSlotTrySyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kTryDBSync) {
SendSlotDBSyncRequest(p_info.db_name_, p_info.slot_id_);
} else if (s_slot->State() == ReplState::kWaitReply) {
continue;
} else if (s_slot->State() == ReplState::kWaitDBSync) {
s_slot->ActivateRsync();
std::shared_ptr<Slot> slot =
g_pika_server->GetDBSlotById(p_info.db_name_, p_info.slot_id_);
if (slot) {
if (!s_slot->IsRsyncRunning()) {
slot->TryUpdateMasterOffset();
}
} else {
LOG(WARNING) << "Slot not found, DB Name: " << p_info.db_name_
<< " Slot Id: " << p_info.slot_id_;
}
} else if (s_slot->State() == ReplState::kConnected || s_slot->State() == ReplState::kNoConnect ||
s_slot->State() == ReplState::kDBNoConnect) {
continue;
}
}
return Status::OK();
}

src/pika_rm.cc

这里调用 SendSlotTrySync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status PikaReplicaManager::SendSlotTrySyncRequest(const std::string& db_name, size_t slot_id) {
BinlogOffset boffset;
if (!g_pika_server->GetDBSlotBinlogOffset(db_name, slot_id, &boffset)) {
LOG(WARNING) << "Slot: " << db_name << ":" << slot_id << ", Get Slot binlog offset failed";
return Status::Corruption("Slot get binlog offset error");
}

std::shared_ptr<SyncSlaveSlot> slave_slot =
GetSyncSlaveSlotByName(SlotInfo(db_name, slot_id));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << db_name << ":" << slot_id << ", NotFound";
return Status::Corruption("Slave Slot not found");
}

Status status =
pika_repl_client_->SendSlotTrySync(slave_slot->MasterIp(), slave_slot->MasterPort(), db_name,
slot_id, boffset, slave_slot->LocalIp());

if (status.ok()) {
slave_slot->SetReplState(ReplState::kWaitReply);
} else {
slave_slot->SetReplState(ReplState::kError);
LOG(WARNING) << "SendSlotTrySyncRequest failed " << status.ToString();
}
return status;
}

src/pika_repl_client.cc

TrySync 请求中设置了 Binlog_offset, Binlog_filnum, ip, port, db_name, slot_id发送给 Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Status PikaReplClient::SendSlotTrySync(const std::string& ip, uint32_t port, const std::string& db_name,
uint32_t slot_id, const BinlogOffset& boffset,
const std::string& local_ip) {
InnerMessage::InnerRequest request;
request.set_type(InnerMessage::kTrySync);
InnerMessage::InnerRequest::TrySync* try_sync = request.mutable_try_sync();
InnerMessage::Node* node = try_sync->mutable_node();
node->set_ip(local_ip);
node->set_port(g_pika_server->port());
InnerMessage::Slot* slot = try_sync->mutable_slot();
slot->set_db_name(db_name);
slot->set_slot_id(slot_id);

InnerMessage::BinlogOffset* binlog_offset = try_sync->mutable_binlog_offset();
binlog_offset->set_filenum(boffset.filenum);
binlog_offset->set_offset(boffset.offset);

std::string to_send;
if (!request.SerializeToString(&to_send)) {
LOG(WARNING) << "Serialize Slot TrySync Request Failed, to Master (" << ip << ":" << port << ")";
return Status::Corruption("Serialize Failed");
}
return client_thread_->Write(ip, static_cast<int32_t>(port + kPortShiftReplServer), to_send);
}

小结

TrySync 中,从节点给主节点发送的信息有 ip, port, db_name, slot_id, filenum,offset

src/pika_repl_server_conn.cc

主节点收到从节点发来的 TrySync 请求后,调用 TrySyncOffsetCheck 判断是否需要全量同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void PikaReplServerConn::HandleTrySyncRequest(void* arg) {
...
bool pre_success = true;
...
} else if (pre_success) {
pre_success = TrySyncOffsetCheck(slot, try_sync_request, try_sync_response);
}

if (pre_success) {
pre_success = TrySyncUpdateSlaveNode(slot, try_sync_request, conn, try_sync_response);
}

std::string reply_str;
if (!response.SerializeToString(&reply_str) || (conn->WriteResp(reply_str) != 0)) {
LOG(WARNING) << "Handle Try Sync Failed";
conn->NotifyClose();
return;
}
conn->NotifyWrite();
}

FAQ

  • req->has_consensus_meta() 是什么

    是 proto 文件里面定义的东西

src/pika_repl_server_conn.cc

在这里主节点将自己的 filenumoffset 与从节点的 filenumoffset 进行比较,如果从节点数据比主节点新,返回值为 kSyncPointLarger , 如果从节点传过来的 binlog 在主节点这里已经被删除的情况,返回值为 kSyncPointBePurged,需要做全量同步,如果从节点的点位和主节点的点位对应不上,则不能做同步,返回值为 kError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterSlot>& slot,
const InnerMessage::InnerRequest::TrySync& try_sync_request,
InnerMessage::InnerResponse::TrySync* try_sync_response) {
const InnerMessage::Node& node = try_sync_request.node();
const InnerMessage::BinlogOffset& slave_boffset = try_sync_request.binlog_offset();
std::string slot_name = slot->SlotName();

BinlogOffset boffset;
Status s = slot->Logger()->GetProducerStatus(&(boffset.filenum), &(boffset.offset));
...
InnerMessage::BinlogOffset* master_slot_boffset = try_sync_response->mutable_binlog_offset();
master_slot_boffset->set_filenum(boffset.filenum);
master_slot_boffset->set_offset(boffset.offset);

if (boffset.filenum < slave_boffset.filenum() ||
(boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kSyncPointLarger);
...
return false;
}

std::string confile = NewFileName(slot->Logger()->filename(), slave_boffset.filenum());
if (!pstd::FileExists(confile)) {
LOG(INFO) << "Slot: " << slot_name << " binlog has been purged, may need full sync";
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kSyncPointBePurged);
return false;
}

PikaBinlogReader reader;
reader.Seek(slot->Logger(), slave_boffset.filenum(), slave_boffset.offset());
BinlogOffset seeked_offset;
reader.GetReaderStatus(&(seeked_offset.filenum), &(seeked_offset.offset));
if (seeked_offset.filenum != slave_boffset.filenum() || seeked_offset.offset != slave_boffset.offset()) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
...
}
return true;
}

FAQ

  • 这里的 seek 的作用是什么

    使主节点的 Binlog 同步点位和从节点的点位对应上

src/pika_repl_server_conn.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
bool PikaReplServerConn::TrySyncUpdateSlaveNode(const std::shared_ptr<SyncMasterSlot>& slot,
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const std::shared_ptr<net::PbConn>& conn,
InnerMessage::InnerResponse::TrySync* try_sync_response) {
const InnerMessage::Node& node = try_sync_request.node();
std::string slot_name = slot->SlotName();

if (!slot->CheckSlaveNodeExist(node.ip(), node.port())) {
int32_t session_id = slot->GenSessionId();
if (session_id == -1) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
LOG(WARNING) << "Slot: " << slot_name << ", Gen Session id Failed";
return false;
}
try_sync_response->set_session_id(session_id);
// incremental sync
Status s = slot->AddSlaveNode(node.ip(), node.port(), session_id);
if (!s.ok()) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
LOG(WARNING) << "Slot: " << slot_name << " TrySync Failed, " << s.ToString();
return false;
}
const std::string ip_port = pstd::IpPortString(node.ip(), node.port());
g_pika_rm->ReplServerUpdateClientConnMap(ip_port, conn->fd());
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk);
LOG(INFO) << "Slot: " << slot_name << " TrySync Success, Session: " << session_id;
} else {
int32_t session_id;
Status s = slot->GetSlaveNodeSession(node.ip(), node.port(), &session_id);
if (!s.ok()) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
LOG(WARNING) << "Slot: " << slot_name << ", Get Session id Failed" << s.ToString();
return false;
}
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk);
try_sync_response->set_session_id(session_id);
LOG(INFO) << "Slot: " << slot_name << " TrySync Success, Session: " << session_id;
}
return true;
}

TrySync总结

TrySync 中主节点给从节点回包包括 session_id, db_name, slot_id

截屏2024-01-25 10 13 35

BinlogSync

src/pika_repl_client_conn.cc

TrySync 回包中根据 Master 的返回值去更新 slave_slot 的状态,如果成功置为 kConnected ,然后发送 SendSlotBinlogSyncAckRequest ,如果返回状态是 kSyncPointBePurged 那就需要重新进行全量复制操作状态流转到 kTryDBSync,如果返回状态是 kSyncPointLarger 则说明从节点数据比主节点要新,不能继续进行主从复制, 至此全量同步已经完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
...

const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync();
const InnerMessage::Slot& slot_response = try_sync_response.slot();
...
LogicOffset logic_last_offset;
...
std::string slot_name = slot->SlotName();
if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) {
BinlogOffset boffset;
int32_t session_id = try_sync_response.session_id();
slot->Logger()->GetProducerStatus(&boffset.filenum, &boffset.offset);
slave_slot->SetMasterSessionId(session_id);
LogOffset offset(boffset, logic_last_offset);
g_pika_rm->SendSlotBinlogSyncAckRequest(db_name, slot_id, offset, offset, true);
slave_slot->SetReplState(ReplState::kConnected);
// after connected, update receive time first to avoid connection timeout
slave_slot->SetLastRecvTime(pstd::NowMicros());

LOG(INFO) << "Slot: " << slot_name << " TrySync Ok";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kSyncPointBePurged) {
slave_slot->SetReplState(ReplState::kTryDBSync);
LOG(INFO) << "Slot: " << slot_name << " Need To Try DBSync";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kSyncPointLarger) {
slave_slot->SetReplState(ReplState::kError);
LOG(WARNING) << "Slot: " << slot_name << " TrySync Error, Because the invalid filenum and offset";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kError) {
slave_slot->SetReplState(ReplState::kError);
LOG(WARNING) << "Slot: " << slot_name << " TrySync Error";
}
}

src/pika_rm.cc

调用 SendSlotBinlogSync 去发起增量同步请求

1
2
3
4
5
6
7
8
9
10
11
12
Status PikaReplicaManager::SendSlotBinlogSyncAckRequest(const std::string& db, uint32_t slot_id,
const LogOffset& ack_start, const LogOffset& ack_end,
bool is_first_send) {
std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName(SlotInfo(db, slot_id));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << db << ":" << slot_id << ", NotFound";
return Status::Corruption("Slave Slot not found");
}
return pika_repl_client_->SendSlotBinlogSync(slave_slot->MasterIp(), slave_slot->MasterPort(), db,
slot_id, ack_start, ack_end, slave_slot->LocalIp(),
is_first_send);
}

src/pika_repl_client.cc

这里由于从节点是第一次发 BinlogSync 请求所以 BinlogOffsetack_range_startack_range_end 都是相等的,都是全量同步之后,info 文件里的 Master 执行 dump 操作的时候的同步点位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Status PikaReplClient::SendSlotBinlogSync(const std::string& ip, uint32_t port, const std::string& db_name,
uint32_t slot_id, const LogOffset& ack_start,
const LogOffset& ack_end, const std::string& local_ip,
bool is_first_send) {
InnerMessage::InnerRequest request;
request.set_type(InnerMessage::kBinlogSync);
InnerMessage::InnerRequest::BinlogSync* binlog_sync = request.mutable_binlog_sync();
InnerMessage::Node* node = binlog_sync->mutable_node();
node->set_ip(local_ip);
node->set_port(g_pika_server->port());
binlog_sync->set_db_name(db_name);
binlog_sync->set_slot_id(slot_id);
binlog_sync->set_first_send(is_first_send);

InnerMessage::BinlogOffset* ack_range_start = binlog_sync->mutable_ack_range_start();
ack_range_start->set_filenum(ack_start.b_offset.filenum);
ack_range_start->set_offset(ack_start.b_offset.offset);
ack_range_start->set_term(ack_start.l_offset.term);
ack_range_start->set_index(ack_start.l_offset.index);

InnerMessage::BinlogOffset* ack_range_end = binlog_sync->mutable_ack_range_end();
ack_range_end->set_filenum(ack_end.b_offset.filenum);
ack_range_end->set_offset(ack_end.b_offset.offset);
ack_range_end->set_term(ack_end.l_offset.term);
ack_range_end->set_index(ack_end.l_offset.index);

std::shared_ptr<SyncSlaveSlot> slave_slot =
g_pika_rm->GetSyncSlaveSlotByName(SlotInfo(db_name, slot_id));
if (!slave_slot) {
LOG(WARNING) << "Slave Slot: " << db_name << "_" << slot_id << " not exist";
return Status::NotFound("SyncSlaveSlot NotFound");
}
int32_t session_id = slave_slot->MasterSessionId();
binlog_sync->set_session_id(session_id);

std::string to_send;
if (!request.SerializeToString(&to_send)) {
LOG(WARNING) << "Serialize Slot BinlogSync Request Failed, to Master (" << ip << ":" << port << ")";
return Status::Corruption("Serialize Failed");
}
return client_thread_->Write(ip, static_cast<int32_t>(port + kPortShiftReplServer), to_send);
}

小结

这里的 kBinlogSync 请求中,从节点向主节点传递 ip, port, slot_id, is_first_send, filenum , offset, term, index, session_id, db_name

总结

以上就是全量同步的全部过程,增量同步将放到下篇文章进行讲解~