背景 本篇介绍一下 Pika 的全量同步 ,在讲解代码之前我先介绍一下主从同步中涉及到的几种状态(状态机)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 PIKA_ROLE_SINGLE = 0 (初始节点) PIKA_ROLE_SLAVE = 1 (从节点) PIKA_ROLE_MASTER = 2 (主节点) PIKA_REPL_NO_CONNECT = 0 (未连接状态) PIKA_REPL_SHOULD_META_SYNC = 1 (Meta_Sync状态) PIKA_REPL_META_SYNC_DONE = 2 (Meta_Sync_Done状态) PIKA_REPL_ERROR = 3 (Error状态) enum ReplState { kNoConnect = 0 , kTryConnect = 1 , kTryDBSync = 2 , kWaitDBSync = 3 , kWaitReply = 4 , kConnected = 5 , kError = 6 , kDBNoConnect = 7 }
1. 从命令开始 **src/pika_admin.cc **
我们调用 slaveof ip port
会进行主从同步操作,首先会调用 RemoveMaster
移除当前节点的主节点,,然后调用 SetMaster
设置新的 Master
节点.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void SlaveofCmd::Do (std::shared_ptr<Slot> slot) { ... g_pika_server->RemoveMaster (); ... bool sm_ret = g_pika_server->SetMaster (master_ip_, static_cast <int32_t >(master_port_)); if (sm_ret) { res_.SetRes (CmdRes::kOk); g_pika_server->ClearCacheDbAsync (slot); g_pika_conf->SetSlaveof (master_ip_ + ":" + std::to_string (master_port_)); g_pika_conf->SetMasterRunID ("" ); g_pika_server->SetFirstMetaSync (true ); } else { res_.SetRes (CmdRes::kErrOther, "Server is not in correct state for slaveof" ); } }
src/pika_server.cc
在这里将 repl_state
状态置为 PIKA_REPL_NO_CONNECT
, master_ip
, master_port
都进行了初始化,如果之前这个节点有主节点的话调用 CloseReplClientConn
和 LostConnection
分别关闭 PikaReplClient
连接和 RsyncClient
连接,最后调用 DoSameThingEvertSlot
将所有的 syncslaveslot
的状态置为 kNoConnect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void PikaServer::RemoveMaster () { { std::lock_guard l (state_protector_) ; repl_state_ = PIKA_REPL_NO_CONNECT; role_ &= ~PIKA_ROLE_SLAVE; if (!master_ip_.empty () && master_port_ != -1 ) { g_pika_rm->CloseReplClientConn (master_ip_, master_port_ + kPortShiftReplServer); g_pika_rm->LostConnection (master_ip_, master_port_); ... } master_ip_ = "" ; master_port_ = -1 ; DoSameThingEverySlot (TaskType::kResetReplState); } }
src/pika_rm.cc
关闭 RepClientConn
连接
1 2 3 Status PikaReplicaManager::CloseReplClientConn (const std::string& ip, int32_t port) { return pika_repl_client_->Close (ip, port); }
src/pika_rm.cc
这里将 sync_master_slots
中的 slaves_
移除,将 sync_slave_slots
中的 rsync_cli_
停止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status PikaReplicaManager::LostConnection (const std::string& ip, int port) { std::shared_lock l (slots_rw_) ; for (auto & iter : sync_master_slots_) { std::shared_ptr<SyncMasterSlot> slot = iter.second; Status s = slot->RemoveSlaveNode (ip, port); if (!s.ok () && !s.IsNotFound ()) { LOG (WARNING) << "Lost Connection failed " << s.ToString (); } } for (auto & iter : sync_slave_slots_) { std::shared_ptr<SyncSlaveSlot> slot = iter.second; if (slot->MasterIp () == ip && slot->MasterPort () == port) { slot->Deactivate (); } } return Status::OK (); }
FAQ
src/pika_admin.cc
这里将 repl_state_
设置为 PIKA_REPL_SHOULD_META_SYNC
标识
1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool PikaServer::SetMaster (std::string& master_ip, int master_port) { if (master_ip == "127.0.0.1" ) { master_ip = host_; } std::lock_guard l (state_protector_) ; if (((role_ ^ PIKA_ROLE_SLAVE) != 0 ) && repl_state_ == PIKA_REPL_NO_CONNECT) { master_ip_ = master_ip; master_port_ = master_port; role_ |= PIKA_ROLE_SLAVE; repl_state_ = PIKA_REPL_SHOULD_META_SYNC; return true ; } return false ; }
FAQ
PikaReplClient
连接和 RsyncClient
连接有什么区别
PikaReplClient
是端口号 + 2000 偏移量,RsyncClient
是端口号 + 1000 偏移量, CloseReplClientConn
关闭了 ReplClient
连接,LostConnection
关闭了 Rsync
连接
为什么要用 ((role_ ^ PIKA_ROLE_SLAVE) != 0)
不能直接用 role_ == PIKA_ROLE_SINGLE
, role |= PIKA_ROLE_SLAVE
为什么不能用 role = PIKA_ROLE_SLAVE
小结 在 slaveof
命令中将 repl_state_
设置为 PIKA_REPL_SHOULD_META_SYNC
,每个 syncslaveslot
状态置为了 kNoConnect
, role_
设置成了 PIKA_ROLE_SLAVE
, first_meta_sync_
设置成了 true
src/pika_auxiliary_thread.cc
在刚刚的 slaveof
命令执行完毕时,repl_state_
被置为了 PIKA_REPL_SHOULD_META_SYNC
了,所以这里经过了 ShouldMetaSync
判断后会调用 SendMetaSyncRequest
发起 MetaSync
请求
1 2 3 4 5 6 7 8 9 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } ... }
src/pika_server.cc
判断 repl_state_
是不是需要 META_SYNC
状态
1 2 3 4 bool PikaServer::ShouldMetaSync () { std::shared_lock l (state_protector_) ; return repl_state_ == PIKA_REPL_SHOULD_META_SYNC; }
src/pika_rm.cc
这里调用 SendMetaSync
,同时在请求发送成功后,更新 MetaSync
的 Timestamp
以及将 FirstsetMetaSync
置为 false
.
1 2 3 4 5 6 7 8 9 10 11 12 Status PikaReplicaManager::SendMetaSyncRequest () { Status s; if (time (nullptr ) - g_pika_server->GetMetaSyncTimestamp () >= PIKA_META_SYNC_MAX_WAIT_TIME || g_pika_server->IsFirstMetaSync ()) { s = pika_repl_client_->SendMetaSync (); if (s.ok ()) { g_pika_server->UpdateMetaSyncTimestamp (); g_pika_server->SetFirstMetaSync (false ); } } return s; }
FAQ
time(nullptr) - g_pika_server->GetMetaSyncTimestamp() >= PIKA_META_SYNC_MAX_WAIT_TIME
为什么需要这一层判断
src/pika_repl_client.cc
这里可以看到主从全量复制的时候,都是从发起的连接,而主复用这条连接给从返回,这里先新建一个连接尝试连远端的 master-ip
看是否能连接成功,并获取到 local-ip
, 使用 InnerMessage
将 type
置为 kMetaSync
然后传入 ip
, port
随后发送给 Master
,发送给 Master
目标端口 + 2000
的端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Status PikaReplClient::SendMetaSync () { std::string local_ip; std::unique_ptr<net::NetCli> cli (net::NewRedisCli()) ; cli->set_connect_timeout (1500 ); if ((cli->Connect (g_pika_server->master_ip (), g_pika_server->master_port (), "" )).ok ()) { struct sockaddr_in laddr; socklen_t llen = sizeof (laddr); getsockname (cli->fd (), reinterpret_cast <struct sockaddr*>(&laddr), &llen); std::string tmp_local_ip (inet_ntoa(laddr.sin_addr)) ; local_ip = tmp_local_ip; cli->Close (); } else { LOG (WARNING) << "Failed to connect master, Master (" << g_pika_server->master_ip () << ":" << g_pika_server->master_port () << "), try reconnect" ; sleep (3 ); g_pika_server->ResetMetaSyncStatus (); return Status::Corruption ("Connect master error" ); } InnerMessage::InnerRequest request; request.set_type (InnerMessage::kMetaSync); InnerMessage::InnerRequest::MetaSync* meta_sync = request.mutable_meta_sync (); InnerMessage::Node* node = meta_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); ... std::string to_send; std::string master_ip = g_pika_server->master_ip (); int master_port = g_pika_server->master_port (); if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Meta Sync Request Failed, to Master (" << master_ip << ":" << master_port << ")" ; return Status::Corruption ("Serialize Failed" ); } LOG (INFO) << "Try Send Meta Sync Request to Master (" << master_ip << ":" << master_port << ")" ; return client_thread_->Write (master_ip, master_port + kPortShiftReplServer, to_send); }
小结
MetaSync
请求中从节点给主节点发送了自己的 ip
, port
, masterauth
, 发送的端口号是主节点端口 + 2000
FAQ
src/pika_repl_server_conn.cc
Master 节点拿到 InnerMessage
后,对比一下 masterauth
,然后调用 TryAddSlave
在 Pika_server
中的 slaves_
中添加 slave
节点信息,然后调用 ReplServerUpdateClientConnMap
往 Pika_server
中的 client_conn_map
中添加节点信息,调用 BecomeMaster
把自己的角色从 PIKA_ROLE_SINGLE
变为 PIKA_ROLE_MASTER
,将返回码状态置为 kOK
,这时候主节点产生 ReplicationID
, 然后在 InnerMessage
中记录了 classic_mode
, run_id
, replication_id
, 以及 db_struct
写回给从节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void PikaReplServerConn::HandleMetaSyncRequest (void * arg) { ... LOG (INFO) << "Receive MetaSync, Slave ip: " << node.ip () << ", Slave port:" << node.port (); std::vector<DBStruct> db_structs = g_pika_conf->db_structs (); bool success = g_pika_server->TryAddSlave (node.ip (), node.port (), conn->fd (), db_structs); const std::string ip_port = pstd::IpPortString (node.ip (), node.port ()); g_pika_rm->ReplServerUpdateClientConnMap (ip_port, conn->fd ()); if (!success) { response.set_code (InnerMessage::kOther); response.set_reply ("Slave AlreadyExist" ); } else { g_pika_server->BecomeMaster (); response.set_code (InnerMessage::kOk); InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync (); if (g_pika_conf->replication_id () == "" ) { std::string replication_id = pstd::getRandomHexChars (configReplicationIDSize); g_pika_conf->SetReplicationID (replication_id); g_pika_conf->ConfigRewriteReplicationID (); } meta_sync->set_classic_mode (g_pika_conf->classic_mode ()); meta_sync->set_run_id (g_pika_conf->run_id ()); meta_sync->set_replication_id (g_pika_conf->replication_id ()); for (const auto & db_struct : db_structs) { InnerMessage::InnerResponse_MetaSync_DBInfo* db_info = meta_sync->add_dbs_info (); db_info->set_db_name (db_struct.db_name); db_info->set_slot_num (static_cast <int32_t >(db_struct.slot_num)); } } ... std::string reply_str; if (!response.SerializeToString (&reply_str) || (conn->WriteResp (reply_str) != 0 )) { LOG (WARNING) << "Process MetaSync request serialization failed" ; conn->NotifyClose (); return ; } conn->NotifyWrite (); }
在 MetaSync
请求中,主节点给从节点回了 classic_mode
, run_id
, replication_id
,db_name
, slot_num
这些信息,用于从节点去对比自己的 DB 结构以及设置 replication_id
.
src/pika_repl_client_conn.cc
从节点拿到 Master 的 MetaSyncResponse
,先解析状态码,如果是 kOther
则重新进行 MetaSync
,如果不是 kOK
则报错,将 repl_state_
置为 PIKA_REPL_ERROR
, 如果顺利的话就从 Master 节点返回的 DBStruct 去对比自己的 DBStruct (这里对比的是 DBName, slot_sum, slot_id 这些值),如果不一致则将 repl_state_
置为 PIKA_REPL_ERROR
代表不能做主从同步, 如果和 Master 的 DB 结构一样的话,去查看 Master 返回的 ReplicationID
, 如果传过来为空则等待下次请求,如果不为空,判断从节点自己原来有没有 ReplicationID
,如果没有,则将主节点的设置成从节点自己的 ReplicationID
,同时将 force_fulll_sync
置为 true
代表进行强制全量同步 ,然后调用 PrepareSlotTrySync
, 以及调用 FinishMetaSync
将 repl_state_
状态置为 PIKA_REPL_META_SYNC_DONE
代表 MetaSync
流程完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void PikaReplClientConn::HandleMetaSyncResponse (void * arg) { std::unique_ptr<ReplClientTaskArg> task_arg (static_cast <ReplClientTaskArg*>(arg)) ; std::shared_ptr<net::PbConn> conn = task_arg->conn; std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res; ... const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync (); std::vector<DBStruct> master_db_structs; for (int idx = 0 ; idx < meta_sync.dbs_info_size (); ++idx) { const InnerMessage::InnerResponse_MetaSync_DBInfo& db_info = meta_sync.dbs_info (idx); master_db_structs.push_back ({db_info.db_name (), static_cast <uint32_t >(db_info.slot_num ()), {0 }}); } std::vector<DBStruct> self_db_structs = g_pika_conf->db_structs (); if (!PikaReplClientConn::IsDBStructConsistent (self_db_structs, master_db_structs)) { ... g_pika_server->SyncError (); conn->NotifyClose (); return ; } if (meta_sync.replication_id () == "" ) { LOG (WARNING) << "Meta Sync Failed: the relicationid obtained from the server is null, keep sending MetaSync msg" ; return ; } if (g_pika_conf->replication_id () != meta_sync.replication_id () && g_pika_conf->replication_id () != "" ) { LOG (WARNING) << "Meta Sync Failed: replicationid on both sides of the connection are inconsistent" ; g_pika_server->SyncError (); conn->NotifyClose (); return ; } if (g_pika_conf->replication_id () != meta_sync.replication_id ()) { ... g_pika_server->force_full_sync_ = true ; g_pika_conf->SetReplicationID (meta_sync.replication_id ()); g_pika_conf->ConfigRewriteReplicationID (); } g_pika_conf->SetWriteBinlog ("yes" ); g_pika_server->PrepareSlotTrySync (); g_pika_server->FinishMetaSync (); LOG (INFO) << "Finish to handle meta sync response" ; }
FAQ
src/pika_server.cc
在 PrepardSlotTrySync
中判断 force_full_sync_
是不是要进行强制全量同步, 如果是的话将 state
置为 kTryDBSync
否则置为 kTryConnect
, 遍历整个 DB 结构每一个 slot
去调用 ActicateSyncSlaveSlot
去和 Master 节点做连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void PikaServer::PrepareSlotTrySync () { std::shared_lock rwl (dbs_rw_) ; ReplState state = force_full_sync_ ? ReplState::kTryDBSync : ReplState::kTryConnect; for (const auto & db_item : dbs_) { for (const auto & slot_item : db_item.second->slots_) { Status s = g_pika_rm->ActivateSyncSlaveSlot ( RmNode (g_pika_server->master_ip (), g_pika_server->master_port (), db_item.second->GetDBName (), slot_item.second->GetSlotID ()), state); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } } } force_full_sync_ = false ; LOG (INFO) << "Mark try connect finish" ; }
src/pika_rm.cc
SelectLocalIP
让每个 slot
和远端的 Master 都尝试建立一条连接,然后将每个节点的 state
置为 kTryDBSync
,这里的 sync_slave_slots
保存了每个 slot
的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status PikaReplicaManager::ActivateSyncSlaveSlot (const RmNode& node, const ReplState& repl_state) { std::shared_lock l (slots_rw_) ; const SlotInfo& p_info = node.NodeSlotInfo (); if (sync_slave_slots_.find (p_info) == sync_slave_slots_.end ()) { return Status::NotFound ("Sync Slave Slot " + node.ToString () + " not found" ); } ReplState ssp_state = sync_slave_slots_[p_info]->State (); if (ssp_state != ReplState::kNoConnect && ssp_state != ReplState::kDBNoConnect) { return Status::Corruption ("Sync Slave Slot in " + ReplStateMsg[ssp_state]); } std::string local_ip; Status s = SelectLocalIp (node.Ip (), node.Port (), &local_ip); if (s.ok ()) { sync_slave_slots_[p_info]->SetLocalIp (local_ip); sync_slave_slots_[p_info]->Activate (node, repl_state); } return s; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class SyncSlaveSlot { std::unique_ptr<rsync::RsyncClient> rsync_cli_; pstd::Mutex slot_mu_; RmNode m_info_; ReplState repl_state_{kNoConnect}; std::string local_ip_; } class SyncMasterSlot { int32_t session_id_ = 0 ; ConsensusCoordinator coordinator_; } class ConsensusCoordinator { LogOffset committed_index_; std::shared_ptr<Context> context_; std::shared_mutex term_rwlock_; uint32_t term_ = 0 ; std::string db_name_; uint32_t slot_id_ = 0 ; SyncProgress sync_pros_; std::shared_ptr<StableLog> stable_logger_; std::shared_ptr<MemLog> mem_logger_; }
DBSync src/pika_auxiliary_thread.cc
重新回到辅助线程,由于刚刚把 repl_state_
置为了 PIKA_REPL_META_SYNC_DONE
,并且每个 syncslaveslot
的状态变成了kTryDBSync
,所以我们这里我们经过了 MetaSyncDone
判断之后去执行 RunSyncSlaveSlotStateMachine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } pstd::Status s = g_pika_rm->CheckSyncTimeout (pstd::NowMicros ()); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } g_pika_server->CheckLeaderProtectedMode (); s = g_pika_server->TriggerSendBinlogSync (); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } int res = g_pika_server->SendToPeer (); if (res == 0 ) { std::unique_lock lock (mu_) ; cv_.wait_for (lock, 100 ms); } else { } } return nullptr ; }
src/pika_rm.cc
这个用了一个 for
循环去遍历 sync_slave_slots
中每个 slot
的状态,在刚刚的 MetaSync
处理请求结果中,我们将每个 slot
的状态设置成了 kTryDBSync
了,所以这里我们调用的是 SendSlotDBSyncRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
src/pika_rm.cc
这里调用 GetDBSlotBinlogOffset
将从节点自身的 Binlog
的 filenum
和 offset
记录在 boffset
中,调用 PrepareRsync
,在 dbsync_path_
下先删除了之前的文件夹然后重新建立五种数据类型的文件夹. 然后调用了 SendSlotDBSync
,如果成功的话将 repl_state_
的状态设置为 kWaitReply
否则置为 kError
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Status PikaReplicaManager::SendSlotDBSyncRequest (const std::string& db_name, size_t slot_id) { BinlogOffset boffset; if (!g_pika_server->GetDBSlotBinlogOffset (db_name, slot_id, &boffset)) { LOG (WARNING) << "Slot: " << db_name << ":" << slot_id << ", Get slot binlog offset failed" ; return Status::Corruption ("Slot get binlog offset error" ); } std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!slot) { LOG (WARNING) << "Slot: " << db_name << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slot not found" ); } slot->PrepareRsync (); std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db_name, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_name << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } Status status = pika_repl_client_->SendSlotDBSync (slave_slot->MasterIp (), slave_slot->MasterPort (), db_name, slot_id, boffset, slave_slot->LocalIp ()); if (status.ok ()) { slave_slot->SetReplState (ReplState::kWaitReply); } else { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "SendSlotDBSync failed " << status.ToString (); } return status; }
FAQ
src/pika_repl_client.cc
这里将 InnerMessage
类型设置为 kDBSync
然后将 ip-port
, db_name
, slot_id
,filenum
,offset
传进去,发送给 Master 节点.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Status PikaReplClient::SendSlotDBSync (const std::string& ip, uint32_t port, const std::string& db_name, uint32_t slot_id, const BinlogOffset& boffset, const std::string& local_ip) { InnerMessage::InnerRequest request; request.set_type (InnerMessage::kDBSync); InnerMessage::InnerRequest::DBSync* db_sync = request.mutable_db_sync (); InnerMessage::Node* node = db_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); InnerMessage::Slot* slot = db_sync->mutable_slot (); slot->set_db_name (db_name); slot->set_slot_id (slot_id); InnerMessage::BinlogOffset* binlog_offset = db_sync->mutable_binlog_offset (); binlog_offset->set_filenum (boffset.filenum); binlog_offset->set_offset (boffset.offset); std::string to_send; if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Slot DBSync Request Failed, to Master (" << ip << ":" << port << ")" ; return Status::Corruption ("Serialize Failed" ); } return client_thread_->Write (ip, static_cast <int32_t >(port) + kPortShiftReplServer, to_send); }
小结
从节点发送的 DBSync
请求中,发送了 ip
, port
, db_name
, slot_id
, filenum
, offset
src/pika_repl_server_conn.cc
这里主节点将从节点信息写到 syncmasterslot
中的 slave_
中以及 Pika_repl_server 中的 client_conn_map
中,然后调用 DoBgSaveSlot
执行 Bgsave
操作,注意这里调用的端口是目标端口 + 1000
,最后调用 ActivateSlaveDbSync
1 const int kPortShiftRSync = 1000 ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void PikaReplServerConn::HandleDBSyncRequest (void * arg) { ... bool prior_success = true ; std::shared_ptr<SyncMasterSlot> master_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name, slot_id)); if (!master_slot) { ... prior_success = false ; } if (prior_success) { if (!master_slot->CheckSlaveNodeExist (node.ip (), node.port ())) { int32_t session_id = master_slot->GenSessionId (); if (session_id == -1 ) { response.set_code (InnerMessage::kError); ... prior_success = false ; } if (prior_success) { db_sync_response->set_session_id (session_id); Status s = master_slot->AddSlaveNode (node.ip (), node.port (), session_id); if (s.ok ()) { const std::string ip_port = pstd::IpPortString (node.ip (), node.port ()); g_pika_rm->ReplServerUpdateClientConnMap (ip_port, conn->fd ()); ... } else { db_sync_response->set_session_id (-1 ); } } else { int32_t session_id; Status s = master_slot->GetSlaveNodeSession (node.ip (), node.port (), &session_id); if (!s.ok ()) { response.set_code (InnerMessage::kError); ... db_sync_response->set_session_id (-1 ); ... } } g_pika_server->DoBgSaveSlot (node.ip (), node.port () + kPortShiftRSync, db_name, slot_id, static_cast <int32_t >(slave_boffset.filenum ())); ... master_slot->ActivateSlaveDbSync (node.ip (), node.port ()); if (!response.SerializeToString (&reply_str) || (conn->WriteResp (reply_str) != 0 )) { ... conn->NotifyClose (); return ; } conn->NotifyWrite (); }
src/pika_server.cc
根据 bgsave_info
的信息判断是否需要重新进行 bgsave
产生 dump
文件, 当 dump
文件夹不存在或者上次进行 bgsave
时的 binlog
已经不存在或者从节点的binlog_filenum
已经大于自己且超过阈值(50),那么需要重新进行 bgsave
,否则使用原来的 dump
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void PikaServer::DoBgSaveSlot (const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top) { std::shared_ptr<Slot> slot = GetDBSlotById (db_name, slot_id); if (!slot) { LOG (WARNING) << "can not find Slot whose id is " << slot_id << " in db " << db_name << ", DoBgSaveSlot Failed" ; return ; } std::shared_ptr<SyncMasterSlot> sync_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name, slot_id)); if (!sync_slot) { LOG (WARNING) << "can not find Slot whose id is " << slot_id << " in db " << db_name << ", DoBgSaveSlot Failed" ; return ; } BgSaveInfo bgsave_info = slot->bgsave_info (); std::string logger_filename = sync_slot->Logger ()->filename (); if (pstd::IsDir (bgsave_info.path) != 0 || !pstd::FileExists (NewFileName (logger_filename, bgsave_info.offset.b_offset.filenum)) || top - bgsave_info.offset.b_offset.filenum > kDBSyncMaxGap) { slot->BgSaveSlot (); } }
src/pika_rm.cc
在 ActivateSlaveDbsync
中将 Slave
节点的状态置为 kSlaveDbSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Status SyncMasterSlot::ActivateSlaveDbSync (const std::string& ip, int port) { std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode (ip, port); if (!slave_ptr) { return Status::NotFound ("ip " + ip + " port " + std::to_string (port)); } slave_ptr->Lock (); slave_ptr->slave_state = kSlaveDbSync; slave_ptr->Unlock (); return Status::OK (); } enum SlaveState { kSlaveNotSync = 0 , kSlaveDbSync = 1 , kSlaveBinlogSync = 2 , };
FAQ
DBSync总结 在 DBSync
中主节点给从节点的回包信息中写了 session_id
, db_name
, slot_id
,同时主节点根据判断做出是不是要进行 bgsave
操作
src/pika_repl_client_conn.cc
从节点收到 Master 节点的 DBSync
回包后,设置一下自己的 session_id
, 然后停止 Rsync
线程,将状态改为 kWaitDBSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void PikaReplClientConn::HandleDBSyncResponse (void * arg) { ... if (response->code () != InnerMessage::kOk) { slave_slot->SetReplState (ReplState::kError); std::string reply = response->has_reply () ? response->reply () : "" ; LOG (WARNING) << "DBSync Failed: " << reply; return ; } slave_slot->SetMasterSessionId (session_id); std::string slot_name = slave_slot->SlotName (); slave_slot->StopRsync (); slave_slot->SetReplState (ReplState::kWaitDBSync); LOG (INFO) << "Slot: " << slot_name << " Need Wait To Sync" ; }
FAQ
src/pika_rm.cc
再次回到这个 for 循环,由于我们现在是 kWaitDBSync
状态,所以我们调用 ActivateRsync
启动了 Rsync
线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
src/pika_rm.cc
在 ActivateRsync
中先调用 Init()
1 2 3 4 5 6 7 8 9 void SyncSlaveSlot::ActivateRsync () { if (!rsync_cli_->IsIdle ()) { return ; } LOG (WARNING) << "ActivateRsync ..." ; if (rsync_cli_->Init ()) { rsync_cli_->Start (); } }
src/rsync_client.cc
这里使用 port
+ 偏移量(10001) 作为 master_port_
这个 Rsync
的端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 bool RsyncClient::Init () { if (state_ != IDLE) { LOG (WARNING) << "State should be IDLE when Init" ; return false ; } master_ip_ = g_pika_server->master_ip (); master_port_ = g_pika_server->master_port () + kPortShiftRsync2; file_set_.clear (); client_thread_->StartThread (); bool ret = Recover (); if (!ret) { LOG (WARNING) << "RsyncClient recover failed" ; client_thread_->StopThread (); return false ; } finished_work_cnt_.store (0 ); LOG (INFO) << "RsyncClient recover success" ; return true ; }
FAQ
这里的 client_thread
线程是什么
src/rsync_client.cc
在 Recover
中,我们先调用了 CopyRemoteMeta
函数用于拉取远端 dump
文件中的元信息,然后再根据本地的信息进行对比,去更新数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 bool RsyncClient::Recover () { std::string local_snapshot_uuid; std::string remote_snapshot_uuid; std::set<std::string> local_file_set; std::set<std::string> remote_file_set; std::map<std::string, std::string> local_file_map; Status s = CopyRemoteMeta (&remote_snapshot_uuid, &remote_file_set); ... s = LoadLocalMeta (&local_snapshot_uuid, &local_file_map); ... for (auto const & file : local_file_map) { local_file_set.insert (file.first); } std::set<std::string> expired_files; if (remote_snapshot_uuid != local_snapshot_uuid) { snapshot_uuid_ = remote_snapshot_uuid; file_set_ = remote_file_set; expired_files = local_file_set; } else { std::set<std::string> newly_files; set_difference (remote_file_set.begin (), remote_file_set.end (), local_file_set.begin (), local_file_set.end (), inserter (newly_files, newly_files.begin ())); set_difference (local_file_set.begin (), local_file_set.end (), remote_file_set.begin (), remote_file_set.end (), inserter (expired_files, expired_files.begin ())); file_set_.insert (newly_files.begin (), newly_files.end ()); } s = CleanUpExpiredFiles (local_snapshot_uuid != remote_snapshot_uuid, expired_files); ... s = UpdateLocalMeta (snapshot_uuid_, expired_files, &local_file_map); ... state_ = RUNNING; ... return true ; }
FAQ
src/rsync_client.cc
这里封装了一个 RsyncRequest
里面有 db_name
, slot_id
, kRsyncMeta
, reader_index
然后发送给 Master,收到 resp
后将 Master 发给自己的 Dump
文件的元信息记录到 file_set
中,同时更新 snapshot_uuid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 Status RsyncClient::CopyRemoteMeta (std::string* snapshot_uuid, std::set<std::string>* file_set) { Status s; int retries = 0 ; RsyncRequest request; request.set_reader_index (0 ); request.set_db_name (db_name_); request.set_slot_id (slot_id_); request.set_type (kRsyncMeta); std::string to_send; request.SerializeToString (&to_send); while (retries < max_retries_) { WaitObject* wo = wo_mgr_->UpdateWaitObject (0 , "" , kRsyncMeta, kInvalidOffset); s = client_thread_->Write (master_ip_, master_port_, to_send); if (!s.ok ()) { retries++; } std::shared_ptr<RsyncResponse> resp; s = wo->Wait (resp); if (s.IsTimeout () || resp.get () == nullptr ) { LOG (WARNING) << "rsync CopyRemoteMeta request timeout, " << "retry times: " << retries; retries++; continue ; } if (resp->code () != RsyncService::kOk) { continue ; } LOG (INFO) << "receive rsync meta infos, snapshot_uuid: " << resp->snapshot_uuid () << "files count: " << resp->meta_resp ().filenames_size (); for (std::string item : resp->meta_resp ().filenames ()) { file_set->insert (item); } *snapshot_uuid = resp->snapshot_uuid (); break ; } return s; }
小结
从节点发送的 RsyncMeta
请求中包含了 db_name
, slot_id
, reader_index
src/rsync_server.cc
Master 端处理 MetaRsync
请求,如果当前的 slot
正在做 bgsave
的话会直接返回,如果没有调用 GetDumpMeta
将 Dump
文件中的元信息记录到 filenames
中,然后一并返回给 Slave
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void RsyncServerConn::HandleMetaRsyncRequest (void * arg) { std::unique_ptr<RsyncServerTaskArg> task_arg (static_cast <RsyncServerTaskArg*>(arg)) ; const std::shared_ptr<RsyncService::RsyncRequest> req = task_arg->req; std::shared_ptr<net::PbConn> conn = task_arg->conn; std::string db_name = req->db_name (); uint32_t slot_id = req->slot_id (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!slot || slot->IsBgSaving ()) { LOG (WARNING) << "waiting bgsave done..." ; return ; } RsyncService::RsyncResponse response; response.set_reader_index (req->reader_index ()); response.set_code (RsyncService::kOk); response.set_type (RsyncService::kRsyncMeta); response.set_db_name (db_name); response.set_slot_id (slot_id); std::vector<std::string> filenames; std::string snapshot_uuid; g_pika_server->GetDumpMeta (db_name, slot_id, &filenames, &snapshot_uuid); response.set_snapshot_uuid (snapshot_uuid); ... RsyncService::MetaResponse* meta_resp = response.mutable_meta_resp (); for (const auto & filename : filenames) { meta_resp->add_filenames (filename); } RsyncWriteResp (response, conn); }
FAQ
在 MetaRsync
请求中,主节点给从节点的回包包括 db_name
, slot_id
, reader_index
, snapshot_uuid
, filename
RsyncFile src/rsync_client.cc
在 file_set_
里面保存着远端文件的元信息之后,work_threads
就开始调用 Copy
去远端拉取文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void * RsyncClient::ThreadMain () { ... std::vector<std::set<std::string> > file_vec (GetParallelNum ()); int index = 0 ; for (const auto & file : file_set_) { file_vec[index++ % GetParallelNum ()].insert (file); } for (int i = 0 ; i < GetParallelNum (); i++) { work_threads_[i] = std::move (std::thread (&RsyncClient::Copy, this , file_vec[i], i)); } ... while (state_.load () == RUNNING) { uint64_t elapse = pstd::NowMicros () - start_time; if (elapse < kFlushIntervalUs) { int wait_for_us = kFlushIntervalUs - elapse; std::unique_lock<std::mutex> lock (mu_) ; cond_.wait_for (lock, std::chrono::microseconds (wait_for_us)); } if (state_.load () != RUNNING) { break ; } start_time = pstd::NowMicros (); std::map<std::string, std::string> files_map; { std::lock_guard<std::mutex> guard (mu_) ; files_map.swap (meta_table_); } for (const auto & file : files_map) { meta_rep.append (file.first + ":" + file.second); meta_rep.append ("\n" ); } ... if (finished_work_cnt_.load () == GetParallelNum ()) { break ; } } for (int i = 0 ; i < GetParallelNum (); i++) { work_threads_[i].join (); } finished_work_cnt_.store (0 ); state_.store (STOP); ... }
FAQ
src/rsync_client.cc
调用 CopyRemoteFile
去远程拉取文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void RsyncClient::Copy (const std::set<std::string>& file_set, int index) { Status s = Status::OK (); for (const auto & file : file_set) { while (state_.load () == RUNNING) { LOG (INFO) << "copy remote file, filename: " << file; s = CopyRemoteFile (file, index); if (!s.ok ()) { LOG (WARNING) << "copy remote file failed, msg: " << s.ToString (); continue ; } break ; } if (state_.load () != RUNNING) { break ; } } LOG (INFO) << "work_thread index: " << index << " copy remote files done" ; finished_work_cnt_.fetch_add (1 ); cond_.notify_all (); }
src/rsync_client.cc
这里封装了请求 RsyncRequest
,发给 Master 端 reader_index
, db_name
, slot_id
, filename
, offset
, count
, 同时指定了写文件的路径 dbsync
, 当 Master 端回包的时候,调用 Writer
在路径下写文件.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 Status RsyncClient::CopyRemoteFile (const std::string& filename, int index) { const std::string filepath = dir_ + "/" + filename; std::unique_ptr<RsyncWriter> writer (new RsyncWriter(filepath)) ; ... while (retries < max_retries_) { if (state_.load () != RUNNING) { break ; } size_t copy_file_begin_time = pstd::NowMicros (); size_t count = Throttle::GetInstance ().ThrottledByThroughput (kBytesPerRequest); if (count == 0 ) { std::this_thread::sleep_for (std::chrono::milliseconds (1000 / kThrottleCheckCycle)); continue ; } RsyncRequest request; request.set_reader_index (index); request.set_type (kRsyncFile); request.set_db_name (db_name_); request.set_slot_id (slot_id_); FileRequest* file_req = request.mutable_file_req (); file_req->set_filename (filename); file_req->set_offset (offset); file_req->set_count (count); std::string to_send; request.SerializeToString (&to_send); WaitObject* wo = wo_mgr_->UpdateWaitObject (index, filename, kRsyncFile, offset); s = client_thread_->Write (master_ip_, master_port_, to_send); ... std::shared_ptr<RsyncResponse> resp = nullptr ; s = wo->Wait (resp); ... size_t ret_count = resp->file_resp ().count (); size_t elaspe_time_us = pstd::NowMicros () - copy_file_begin_time; Throttle::GetInstance ().ReturnUnusedThroughput (count, ret_count, elaspe_time_us); ... s = writer->Write ((uint64_t )offset, ret_count, resp->file_resp ().data ().c_str ()); ... offset += resp->file_resp ().count (); if (resp->file_resp ().eof ()) { s = writer->Fsync (); ... break ; } retries = 0 ; } ... }
小结
这里从节点发送的 RsyncFile
请求中包括 db_name
, slot_id
, reader_index
, filename
, offset
, count
src/rsync_server.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void RsyncServerConn::HandleFileRsyncRequest (void * arg) { ... std::string snapshot_uuid; Status s = g_pika_server->GetDumpUUID (db_name, slot_id, &snapshot_uuid); response.set_snapshot_uuid (snapshot_uuid); if (!s.ok ()) { LOG (WARNING) << "rsyncserver get snapshotUUID failed" ; response.set_code (RsyncService::kErr); RsyncWriteResp (response, conn); return ; } std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!slot) { LOG (WARNING) << "cannot find slot for db_name: " << db_name << " slot_id: " << slot_id; response.set_code (RsyncService::kErr); RsyncWriteResp (response, conn); } const std::string filepath = slot->bgsave_info ().path + "/" + filename; char * buffer = new char [req->file_req ().count () + 1 ]; size_t bytes_read{0 }; std::string checksum = "" ; bool is_eof = false ; std::shared_ptr<RsyncReader> reader = conn->readers_[req->reader_index ()]; s = reader->Read (filepath, offset, count, buffer, &bytes_read, &checksum, &is_eof); if (!s.ok ()) { response.set_code (RsyncService::kErr); RsyncWriteResp (response, conn); delete []buffer; return ; } RsyncService::FileResponse* file_resp = response.mutable_file_resp (); file_resp->set_data (buffer, bytes_read); file_resp->set_eof (is_eof); file_resp->set_checksum (checksum); file_resp->set_filename (filename); file_resp->set_count (bytes_read); file_resp->set_offset (offset); RsyncWriteResp (response, conn); delete []buffer; }
FAQ
这里为什么 snapshot 不能通过 Pb 传过去,而且复制完之后到从节点这边再去判断
s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str());
这个是把数据写到哪里了
checksum
有什么用
RsyncFile 总结 这里 Master 的 FileRsync
请求给从节点值包括 db_name
, slot_id
, read_index
, data
, eof
, checksum
, filename
, bytes_read
, offset
TrySync src/pika_rm.cc
在获取到所有的远端文件之后,Rsync 线程关闭,调用 TryUpdateMasterOffset
更新 offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
src/pika_slot.cc
这里会更新 binlog
offset
的偏移量,我们会去去读取 info
文件,然后把 info
文件中的 master_port
, filenum
, offset
, term
, index
更新到从节点上,然后对 info
文件进行删除,这里调用 ChaneDb
将 dbsync
文件替换 db
文件夹,然后删除旧的 db
文件夹,然后将 slot
状态设置成 kTryConnect
. 也就是说不管之前从节点的 binlog
写到哪里了,现在的从节点的偏移量和文件名是根据主节点传给他的偏移量和文件名的地方开始写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 bool Slot::TryUpdateMasterOffset () { std::string info_path = dbsync_path_ + kBgsaveInfoFile; ... std::ifstream is (info_path) ; ... std::string line; std::string master_ip; ... is.close (); ... pstd::DeleteFile (info_path); if (!ChangeDb (dbsync_path_)) { LOG (WARNING) << "Slot: " << slot_name_ << ", Failed to change db" ; slave_slot->SetReplState (ReplState::kError); return false ; } std::shared_ptr<SyncMasterSlot> master_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name_, slot_id_)); if (!master_slot) { LOG (WARNING) << "Master Slot: " << slot_name_ << " not exist" ; return false ; } master_slot->Logger ()->SetProducerStatus (filenum, offset); slave_slot->SetReplState (ReplState::kTryConnect); return true ; }
src/pika_rm.cc
由于 slot
状态变更成了 kTryConnect
, 这里调用 SendSlotTrySyncRequest
准备进行增量同步判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
src/pika_rm.cc
这里调用 SendSlotTrySync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Status PikaReplicaManager::SendSlotTrySyncRequest (const std::string& db_name, size_t slot_id) { BinlogOffset boffset; if (!g_pika_server->GetDBSlotBinlogOffset (db_name, slot_id, &boffset)) { LOG (WARNING) << "Slot: " << db_name << ":" << slot_id << ", Get Slot binlog offset failed" ; return Status::Corruption ("Slot get binlog offset error" ); } std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db_name, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_name << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } Status status = pika_repl_client_->SendSlotTrySync (slave_slot->MasterIp (), slave_slot->MasterPort (), db_name, slot_id, boffset, slave_slot->LocalIp ()); if (status.ok ()) { slave_slot->SetReplState (ReplState::kWaitReply); } else { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "SendSlotTrySyncRequest failed " << status.ToString (); } return status; }
src/pika_repl_client.cc
在 TrySync
请求中设置了 Binlog_offset
, Binlog_filnum
, ip
, port
, db_name
, slot_id
发送给 Master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Status PikaReplClient::SendSlotTrySync (const std::string& ip, uint32_t port, const std::string& db_name, uint32_t slot_id, const BinlogOffset& boffset, const std::string& local_ip) { InnerMessage::InnerRequest request; request.set_type (InnerMessage::kTrySync); InnerMessage::InnerRequest::TrySync* try_sync = request.mutable_try_sync (); InnerMessage::Node* node = try_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); InnerMessage::Slot* slot = try_sync->mutable_slot (); slot->set_db_name (db_name); slot->set_slot_id (slot_id); InnerMessage::BinlogOffset* binlog_offset = try_sync->mutable_binlog_offset (); binlog_offset->set_filenum (boffset.filenum); binlog_offset->set_offset (boffset.offset); std::string to_send; if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Slot TrySync Request Failed, to Master (" << ip << ":" << port << ")" ; return Status::Corruption ("Serialize Failed" ); } return client_thread_->Write (ip, static_cast <int32_t >(port + kPortShiftReplServer), to_send); }
小结
在 TrySync
中,从节点给主节点发送的信息有 ip
, port
, db_name
, slot_id
, filenum
,offset
src/pika_repl_server_conn.cc
主节点收到从节点发来的 TrySync
请求后,调用 TrySyncOffsetCheck
判断是否需要全量同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void PikaReplServerConn::HandleTrySyncRequest (void * arg) { ... bool pre_success = true ; ... } else if (pre_success) { pre_success = TrySyncOffsetCheck (slot, try_sync_request, try_sync_response); } if (pre_success) { pre_success = TrySyncUpdateSlaveNode (slot, try_sync_request, conn, try_sync_response); } std::string reply_str; if (!response.SerializeToString (&reply_str) || (conn->WriteResp (reply_str) != 0 )) { LOG (WARNING) << "Handle Try Sync Failed" ; conn->NotifyClose (); return ; } conn->NotifyWrite (); }
FAQ
src/pika_repl_server_conn.cc
在这里主节点将自己的 filenum
和 offset
与从节点的 filenum
和 offset
进行比较,如果从节点数据比主节点新,返回值为 kSyncPointLarger
, 如果从节点传过来的 binlog
在主节点这里已经被删除的情况,返回值为 kSyncPointBePurged
,需要做全量同步,如果从节点的点位和主节点的点位对应不上,则不能做同步,返回值为 kError
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 bool PikaReplServerConn::TrySyncOffsetCheck (const std::shared_ptr<SyncMasterSlot>& slot, const InnerMessage::InnerRequest::TrySync& try_sync_request, InnerMessage::InnerResponse::TrySync* try_sync_response) { const InnerMessage::Node& node = try_sync_request.node (); const InnerMessage::BinlogOffset& slave_boffset = try_sync_request.binlog_offset (); std::string slot_name = slot->SlotName (); BinlogOffset boffset; Status s = slot->Logger ()->GetProducerStatus (&(boffset.filenum), &(boffset.offset)); ... InnerMessage::BinlogOffset* master_slot_boffset = try_sync_response->mutable_binlog_offset (); master_slot_boffset->set_filenum (boffset.filenum); master_slot_boffset->set_offset (boffset.offset); if (boffset.filenum < slave_boffset.filenum () || (boffset.filenum == slave_boffset.filenum () && boffset.offset < slave_boffset.offset ())) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kSyncPointLarger); ... return false ; } std::string confile = NewFileName (slot->Logger ()->filename (), slave_boffset.filenum ()); if (!pstd::FileExists (confile)) { LOG (INFO) << "Slot: " << slot_name << " binlog has been purged, may need full sync" ; try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kSyncPointBePurged); return false ; } PikaBinlogReader reader; reader.Seek (slot->Logger (), slave_boffset.filenum (), slave_boffset.offset ()); BinlogOffset seeked_offset; reader.GetReaderStatus (&(seeked_offset.filenum), &(seeked_offset.offset)); if (seeked_offset.filenum != slave_boffset.filenum () || seeked_offset.offset != slave_boffset.offset ()) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); ... } return true ; }
FAQ
src/pika_repl_server_conn.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 bool PikaReplServerConn::TrySyncUpdateSlaveNode (const std::shared_ptr<SyncMasterSlot>& slot, const InnerMessage::InnerRequest::TrySync& try_sync_request, const std::shared_ptr<net::PbConn>& conn, InnerMessage::InnerResponse::TrySync* try_sync_response) { const InnerMessage::Node& node = try_sync_request.node (); std::string slot_name = slot->SlotName (); if (!slot->CheckSlaveNodeExist (node.ip (), node.port ())) { int32_t session_id = slot->GenSessionId (); if (session_id == -1 ) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); LOG (WARNING) << "Slot: " << slot_name << ", Gen Session id Failed" ; return false ; } try_sync_response->set_session_id (session_id); Status s = slot->AddSlaveNode (node.ip (), node.port (), session_id); if (!s.ok ()) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); LOG (WARNING) << "Slot: " << slot_name << " TrySync Failed, " << s.ToString (); return false ; } const std::string ip_port = pstd::IpPortString (node.ip (), node.port ()); g_pika_rm->ReplServerUpdateClientConnMap (ip_port, conn->fd ()); try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kOk); LOG (INFO) << "Slot: " << slot_name << " TrySync Success, Session: " << session_id; } else { int32_t session_id; Status s = slot->GetSlaveNodeSession (node.ip (), node.port (), &session_id); if (!s.ok ()) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); LOG (WARNING) << "Slot: " << slot_name << ", Get Session id Failed" << s.ToString (); return false ; } try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kOk); try_sync_response->set_session_id (session_id); LOG (INFO) << "Slot: " << slot_name << " TrySync Success, Session: " << session_id; } return true ; }
TrySync总结 TrySync
中主节点给从节点回包包括 session_id
, db_name
, slot_id
BinlogSync src/pika_repl_client_conn.cc
在 TrySync
回包中根据 Master 的返回值去更新 slave_slot
的状态,如果成功置为 kConnected
,然后发送 SendSlotBinlogSyncAckRequest
,如果返回状态是 kSyncPointBePurged
那就需要重新进行全量复制操作状态流转到 kTryDBSync
,如果返回状态是 kSyncPointLarger
则说明从节点数据比主节点要新,不能继续进行主从复制, 至此全量同步已经完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void PikaReplClientConn::HandleTrySyncResponse (void * arg) { std::unique_ptr<ReplClientTaskArg> task_arg (static_cast <ReplClientTaskArg*>(arg)) ; std::shared_ptr<net::PbConn> conn = task_arg->conn; std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res; ... const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync (); const InnerMessage::Slot& slot_response = try_sync_response.slot (); ... LogicOffset logic_last_offset; ... std::string slot_name = slot->SlotName (); if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kOk) { BinlogOffset boffset; int32_t session_id = try_sync_response.session_id (); slot->Logger ()->GetProducerStatus (&boffset.filenum, &boffset.offset); slave_slot->SetMasterSessionId (session_id); LogOffset offset (boffset, logic_last_offset) ; g_pika_rm->SendSlotBinlogSyncAckRequest (db_name, slot_id, offset, offset, true ); slave_slot->SetReplState (ReplState::kConnected); slave_slot->SetLastRecvTime (pstd::NowMicros ()); LOG (INFO) << "Slot: " << slot_name << " TrySync Ok" ; } else if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kSyncPointBePurged) { slave_slot->SetReplState (ReplState::kTryDBSync); LOG (INFO) << "Slot: " << slot_name << " Need To Try DBSync" ; } else if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kSyncPointLarger) { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "Slot: " << slot_name << " TrySync Error, Because the invalid filenum and offset" ; } else if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kError) { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "Slot: " << slot_name << " TrySync Error" ; } }
src/pika_rm.cc
调用 SendSlotBinlogSync
去发起增量同步请求
1 2 3 4 5 6 7 8 9 10 11 12 Status PikaReplicaManager::SendSlotBinlogSyncAckRequest (const std::string& db, uint32_t slot_id, const LogOffset& ack_start, const LogOffset& ack_end, bool is_first_send) { std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } return pika_repl_client_->SendSlotBinlogSync (slave_slot->MasterIp (), slave_slot->MasterPort (), db, slot_id, ack_start, ack_end, slave_slot->LocalIp (), is_first_send); }
src/pika_repl_client.cc
这里由于从节点是第一次发 BinlogSync
请求所以 BinlogOffset
的 ack_range_start
和 ack_range_end
都是相等的,都是全量同步之后,info
文件里的 Master
执行 dump 操作的时候的同步点位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Status PikaReplClient::SendSlotBinlogSync (const std::string& ip, uint32_t port, const std::string& db_name, uint32_t slot_id, const LogOffset& ack_start, const LogOffset& ack_end, const std::string& local_ip, bool is_first_send) { InnerMessage::InnerRequest request; request.set_type (InnerMessage::kBinlogSync); InnerMessage::InnerRequest::BinlogSync* binlog_sync = request.mutable_binlog_sync (); InnerMessage::Node* node = binlog_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); binlog_sync->set_db_name (db_name); binlog_sync->set_slot_id (slot_id); binlog_sync->set_first_send (is_first_send); InnerMessage::BinlogOffset* ack_range_start = binlog_sync->mutable_ack_range_start (); ack_range_start->set_filenum (ack_start.b_offset.filenum); ack_range_start->set_offset (ack_start.b_offset.offset); ack_range_start->set_term (ack_start.l_offset.term); ack_range_start->set_index (ack_start.l_offset.index); InnerMessage::BinlogOffset* ack_range_end = binlog_sync->mutable_ack_range_end (); ack_range_end->set_filenum (ack_end.b_offset.filenum); ack_range_end->set_offset (ack_end.b_offset.offset); ack_range_end->set_term (ack_end.l_offset.term); ack_range_end->set_index (ack_end.l_offset.index); std::shared_ptr<SyncSlaveSlot> slave_slot = g_pika_rm->GetSyncSlaveSlotByName (SlotInfo (db_name, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_name << "_" << slot_id << " not exist" ; return Status::NotFound ("SyncSlaveSlot NotFound" ); } int32_t session_id = slave_slot->MasterSessionId (); binlog_sync->set_session_id (session_id); std::string to_send; if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Slot BinlogSync Request Failed, to Master (" << ip << ":" << port << ")" ; return Status::Corruption ("Serialize Failed" ); } return client_thread_->Write (ip, static_cast <int32_t >(port + kPortShiftReplServer), to_send); }
小结
这里的 kBinlogSync
请求中,从节点向主节点传递 ip
, port
, slot_id
, is_first_send
, filenum
, offset
, term
, index
, session_id
, db_name
总结 以上就是全量同步的全部过程,增量同步将放到下篇文章进行讲解~