背景 本篇介绍一下 Pika 的全量同步 ,在讲解代码之前我先介绍一下主从同步中涉及到的几种状态(状态机)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 PIKA_ROLE_SINGLE = 0 (初始节点) PIKA_ROLE_SLAVE = 1 (从节点) PIKA_ROLE_MASTER = 2 (主节点) PIKA_REPL_NO_CONNECT = 0 (未连接状态) PIKA_REPL_SHOULD_META_SYNC = 1 (Meta_Sync状态) PIKA_REPL_META_SYNC_DONE = 2 (Meta_Sync_Done状态) PIKA_REPL_ERROR = 3 (Error状态) enum ReplState { kNoConnect = 0 , kTryConnect = 1 , kTryDBSync = 2 , kWaitDBSync = 3 , kWaitReply = 4 , kConnected = 5 , kError = 6 , kDBNoConnect = 7 }
1. 从命令开始 **src/pika_admin.cc **
我们调用 slaveof ip port
会进行主从同步操作,首先会调用 RemoveMaster
移除当前节点的主节点,,然后调用 SetMaster
设置新的 Master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void SlaveofCmd::Do (std::shared_ptr<Slot> slot) { ... g_pika_server->RemoveMaster (); ... bool sm_ret = g_pika_server->SetMaster (master_ip_, static_cast <int32_t >(master_port_)); if (sm_ret) { res_.SetRes (CmdRes::kOk); g_pika_server->ClearCacheDbAsync (slot); g_pika_conf->SetSlaveof (master_ip_ + ":" + std::to_string (master_port_)); g_pika_conf->SetMasterRunID ("" ); g_pika_server->SetFirstMetaSync (true ); } else { res_.SetRes (CmdRes::kErrOther, "Server is not in correct state for slaveof" ); } }
在这里将 repl_state
, master_ip
, master_port
都进行了初始化,如果之前这个节点有主节点的话调用 CloseReplClientConn
和 LostConnection
分别关闭 PikaReplClient
连接和 RsyncClient
连接,最后调用 DoSameThingEvertSlot
将所有的 syncslaveslot
的状态置为 kNoConnect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void PikaServer::RemoveMaster () { { std::lock_guard l (state_protector_) ; repl_state_ = PIKA_REPL_NO_CONNECT; role_ &= ~PIKA_ROLE_SLAVE; if (!master_ip_.empty () && master_port_ != -1 ) { g_pika_rm->CloseReplClientConn (master_ip_, master_port_ + kPortShiftReplServer); g_pika_rm->LostConnection (master_ip_, master_port_); ... } master_ip_ = "" ; master_port_ = -1 ; DoSameThingEverySlot (TaskType::kResetReplState); } }
关闭 RepClientConn
1 2 3 Status PikaReplicaManager::CloseReplClientConn (const std::string& ip, int32_t port) { return pika_repl_client_->Close (ip, port); }
这里将 sync_master_slots
中的 slaves_
移除,将 sync_slave_slots
中的 rsync_cli_
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status PikaReplicaManager::LostConnection (const std::string& ip, int port) { std::shared_lock l (slots_rw_) ; for (auto & iter : sync_master_slots_) { std::shared_ptr<SyncMasterSlot> slot = iter.second; Status s = slot->RemoveSlaveNode (ip, port); if (!s.ok () && !s.IsNotFound ()) { LOG (WARNING) << "Lost Connection failed " << s.ToString (); } } for (auto & iter : sync_slave_slots_) { std::shared_ptr<SyncSlaveSlot> slot = iter.second; if (slot->MasterIp () == ip && slot->MasterPort () == port) { slot->Deactivate (); } } return Status::OK (); }
这里将 repl_state_
1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool PikaServer::SetMaster (std::string& master_ip, int master_port) { if (master_ip == "" ) { master_ip = host_; } std::lock_guard l (state_protector_) ; if (((role_ ^ PIKA_ROLE_SLAVE) != 0 ) && repl_state_ == PIKA_REPL_NO_CONNECT) { master_ip_ = master_ip; master_port_ = master_port; role_ |= PIKA_ROLE_SLAVE; repl_state_ = PIKA_REPL_SHOULD_META_SYNC; return true ; } return false ; }
连接和 RsyncClient
是端口号 + 2000 偏移量,RsyncClient
是端口号 + 1000 偏移量, CloseReplClientConn
关闭了 ReplClient
关闭了 Rsync
为什么要用 ((role_ ^ PIKA_ROLE_SLAVE) != 0)
不能直接用 role_ == PIKA_ROLE_SINGLE
为什么不能用 role = PIKA_ROLE_SLAVE
小结 在 slaveof
命令中将 repl_state_
,每个 syncslaveslot
状态置为了 kNoConnect
, role_
, first_meta_sync_
设置成了 true
在刚刚的 slaveof
了,所以这里经过了 ShouldMetaSync
判断后会调用 SendMetaSyncRequest
发起 MetaSync
1 2 3 4 5 6 7 8 9 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } ... }
判断 repl_state_
1 2 3 4 bool PikaServer::ShouldMetaSync () { std::shared_lock l (state_protector_) ; return repl_state_ == PIKA_REPL_SHOULD_META_SYNC; }
这里调用 SendMetaSync
,同时在请求发送成功后,更新 MetaSync
的 Timestamp
以及将 FirstsetMetaSync
置为 false
1 2 3 4 5 6 7 8 9 10 11 12 Status PikaReplicaManager::SendMetaSyncRequest () { Status s; if (time (nullptr ) - g_pika_server->GetMetaSyncTimestamp () >= PIKA_META_SYNC_MAX_WAIT_TIME || g_pika_server->IsFirstMetaSync ()) { s = pika_repl_client_->SendMetaSync (); if (s.ok ()) { g_pika_server->UpdateMetaSyncTimestamp (); g_pika_server->SetFirstMetaSync (false ); } } return s; }
time(nullptr) - g_pika_server->GetMetaSyncTimestamp() >= PIKA_META_SYNC_MAX_WAIT_TIME
这里可以看到主从全量复制的时候,都是从发起的连接,而主复用这条连接给从返回,这里先新建一个连接尝试连远端的 master-ip
看是否能连接成功,并获取到 local-ip
, 使用 InnerMessage
将 type
置为 kMetaSync
然后传入 ip
, port
随后发送给 Master
,发送给 Master
目标端口 + 2000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Status PikaReplClient::SendMetaSync () { std::string local_ip; std::unique_ptr<net::NetCli> cli (net::NewRedisCli()) ; cli->set_connect_timeout (1500 ); if ((cli->Connect (g_pika_server->master_ip (), g_pika_server->master_port (), "" )).ok ()) { struct sockaddr_in laddr; socklen_t llen = sizeof (laddr); getsockname (cli->fd (), reinterpret_cast <struct sockaddr*>(&laddr), &llen); std::string tmp_local_ip (inet_ntoa(laddr.sin_addr)) ; local_ip = tmp_local_ip; cli->Close (); } else { LOG (WARNING) << "Failed to connect master, Master (" << g_pika_server->master_ip () << ":" << g_pika_server->master_port () << "), try reconnect" ; sleep (3 ); g_pika_server->ResetMetaSyncStatus (); return Status::Corruption ("Connect master error" ); } InnerMessage::InnerRequest request; request.set_type (InnerMessage::kMetaSync); InnerMessage::InnerRequest::MetaSync* meta_sync = request.mutable_meta_sync (); InnerMessage::Node* node = meta_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); ... std::string to_send; std::string master_ip = g_pika_server->master_ip (); int master_port = g_pika_server->master_port (); if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Meta Sync Request Failed, to Master (" << master_ip << ":" << master_port << ")" ; return Status::Corruption ("Serialize Failed" ); } LOG (INFO) << "Try Send Meta Sync Request to Master (" << master_ip << ":" << master_port << ")" ; return client_thread_->Write (master_ip, master_port + kPortShiftReplServer, to_send); }
请求中从节点给主节点发送了自己的 ip
, port
, masterauth
, 发送的端口号是主节点端口 + 2000
Master 节点拿到 InnerMessage
后,对比一下 masterauth
,然后调用 TryAddSlave
在 Pika_server
中的 slaves_
中添加 slave
节点信息,然后调用 ReplServerUpdateClientConnMap
往 Pika_server
中的 client_conn_map
中添加节点信息,调用 BecomeMaster
,将返回码状态置为 kOK
,这时候主节点产生 ReplicationID
, 然后在 InnerMessage
中记录了 classic_mode
, run_id
, replication_id
, 以及 db_struct
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void PikaReplServerConn::HandleMetaSyncRequest (void * arg) { ... LOG (INFO) << "Receive MetaSync, Slave ip: " << node.ip () << ", Slave port:" << node.port (); std::vector<DBStruct> db_structs = g_pika_conf->db_structs (); bool success = g_pika_server->TryAddSlave (node.ip (), node.port (), conn->fd (), db_structs); const std::string ip_port = pstd::IpPortString (node.ip (), node.port ()); g_pika_rm->ReplServerUpdateClientConnMap (ip_port, conn->fd ()); if (!success) { response.set_code (InnerMessage::kOther); response.set_reply ("Slave AlreadyExist" ); } else { g_pika_server->BecomeMaster (); response.set_code (InnerMessage::kOk); InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync (); if (g_pika_conf->replication_id () == "" ) { std::string replication_id = pstd::getRandomHexChars (configReplicationIDSize); g_pika_conf->SetReplicationID (replication_id); g_pika_conf->ConfigRewriteReplicationID (); } meta_sync->set_classic_mode (g_pika_conf->classic_mode ()); meta_sync->set_run_id (g_pika_conf->run_id ()); meta_sync->set_replication_id (g_pika_conf->replication_id ()); for (const auto & db_struct : db_structs) { InnerMessage::InnerResponse_MetaSync_DBInfo* db_info = meta_sync->add_dbs_info (); db_info->set_db_name (db_struct.db_name); db_info->set_slot_num (static_cast <int32_t >(db_struct.slot_num)); } } ... std::string reply_str; if (!response.SerializeToString (&reply_str) || (conn->WriteResp (reply_str) != 0 )) { LOG (WARNING) << "Process MetaSync request serialization failed" ; conn->NotifyClose (); return ; } conn->NotifyWrite (); }
在 MetaSync
请求中,主节点给从节点回了 classic_mode
, run_id
, replication_id
, slot_num
这些信息,用于从节点去对比自己的 DB 结构以及设置 replication_id
从节点拿到 Master 的 MetaSyncResponse
,先解析状态码,如果是 kOther
则重新进行 MetaSync
,如果不是 kOK
则报错,将 repl_state_
, 如果顺利的话就从 Master 节点返回的 DBStruct 去对比自己的 DBStruct (这里对比的是 DBName, slot_sum, slot_id 这些值),如果不一致则将 repl_state_
代表不能做主从同步, 如果和 Master 的 DB 结构一样的话,去查看 Master 返回的 ReplicationID
, 如果传过来为空则等待下次请求,如果不为空,判断从节点自己原来有没有 ReplicationID
,如果没有,则将主节点的设置成从节点自己的 ReplicationID
,同时将 force_fulll_sync
置为 true
代表进行强制全量同步 ,然后调用 PrepareSlotTrySync
, 以及调用 FinishMetaSync
将 repl_state_
代表 MetaSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void PikaReplClientConn::HandleMetaSyncResponse (void * arg) { std::unique_ptr<ReplClientTaskArg> task_arg (static_cast <ReplClientTaskArg*>(arg)) ; std::shared_ptr<net::PbConn> conn = task_arg->conn; std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res; ... const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync (); std::vector<DBStruct> master_db_structs; for (int idx = 0 ; idx < meta_sync.dbs_info_size (); ++idx) { const InnerMessage::InnerResponse_MetaSync_DBInfo& db_info = meta_sync.dbs_info (idx); master_db_structs.push_back ({db_info.db_name (), static_cast <uint32_t >(db_info.slot_num ()), {0 }}); } std::vector<DBStruct> self_db_structs = g_pika_conf->db_structs (); if (!PikaReplClientConn::IsDBStructConsistent (self_db_structs, master_db_structs)) { ... g_pika_server->SyncError (); conn->NotifyClose (); return ; } if (meta_sync.replication_id () == "" ) { LOG (WARNING) << "Meta Sync Failed: the relicationid obtained from the server is null, keep sending MetaSync msg" ; return ; } if (g_pika_conf->replication_id () != meta_sync.replication_id () && g_pika_conf->replication_id () != "" ) { LOG (WARNING) << "Meta Sync Failed: replicationid on both sides of the connection are inconsistent" ; g_pika_server->SyncError (); conn->NotifyClose (); return ; } if (g_pika_conf->replication_id () != meta_sync.replication_id ()) { ... g_pika_server->force_full_sync_ = true ; g_pika_conf->SetReplicationID (meta_sync.replication_id ()); g_pika_conf->ConfigRewriteReplicationID (); } g_pika_conf->SetWriteBinlog ("yes" ); g_pika_server->PrepareSlotTrySync (); g_pika_server->FinishMetaSync (); LOG (INFO) << "Finish to handle meta sync response" ; }
在 PrepardSlotTrySync
中判断 force_full_sync_
是不是要进行强制全量同步, 如果是的话将 state
置为 kTryDBSync
否则置为 kTryConnect
, 遍历整个 DB 结构每一个 slot
去调用 ActicateSyncSlaveSlot
去和 Master 节点做连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void PikaServer::PrepareSlotTrySync () { std::shared_lock rwl (dbs_rw_) ; ReplState state = force_full_sync_ ? ReplState::kTryDBSync : ReplState::kTryConnect; for (const auto & db_item : dbs_) { for (const auto & slot_item : db_item.second->slots_) { Status s = g_pika_rm->ActivateSyncSlaveSlot ( RmNode (g_pika_server->master_ip (), g_pika_server->master_port (), db_item.second->GetDBName (), slot_item.second->GetSlotID ()), state); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } } } force_full_sync_ = false ; LOG (INFO) << "Mark try connect finish" ; }
让每个 slot
和远端的 Master 都尝试建立一条连接,然后将每个节点的 state
置为 kTryDBSync
,这里的 sync_slave_slots
保存了每个 slot
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status PikaReplicaManager::ActivateSyncSlaveSlot (const RmNode& node, const ReplState& repl_state) { std::shared_lock l (slots_rw_) ; const SlotInfo& p_info = node.NodeSlotInfo (); if (sync_slave_slots_.find (p_info) == sync_slave_slots_.end ()) { return Status::NotFound ("Sync Slave Slot " + node.ToString () + " not found" ); } ReplState ssp_state = sync_slave_slots_[p_info]->State (); if (ssp_state != ReplState::kNoConnect && ssp_state != ReplState::kDBNoConnect) { return Status::Corruption ("Sync Slave Slot in " + ReplStateMsg[ssp_state]); } std::string local_ip; Status s = SelectLocalIp (node.Ip (), node.Port (), &local_ip); if (s.ok ()) { sync_slave_slots_[p_info]->SetLocalIp (local_ip); sync_slave_slots_[p_info]->Activate (node, repl_state); } return s; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class SyncSlaveSlot { std::unique_ptr<rsync::RsyncClient> rsync_cli_; pstd::Mutex slot_mu_; RmNode m_info_; ReplState repl_state_{kNoConnect}; std::string local_ip_; } class SyncMasterSlot { int32_t session_id_ = 0 ; ConsensusCoordinator coordinator_; } class ConsensusCoordinator { LogOffset committed_index_; std::shared_ptr<Context> context_; std::shared_mutex term_rwlock_; uint32_t term_ = 0 ; std::string db_name_; uint32_t slot_id_ = 0 ; SyncProgress sync_pros_; std::shared_ptr<StableLog> stable_logger_; std::shared_ptr<MemLog> mem_logger_; }
DBSync src/pika_auxiliary_thread.cc
重新回到辅助线程,由于刚刚把 repl_state_
,并且每个 syncslaveslot
,所以我们这里我们经过了 MetaSyncDone
判断之后去执行 RunSyncSlaveSlotStateMachine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } pstd::Status s = g_pika_rm->CheckSyncTimeout (pstd::NowMicros ()); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } g_pika_server->CheckLeaderProtectedMode (); s = g_pika_server->TriggerSendBinlogSync (); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } int res = g_pika_server->SendToPeer (); if (res == 0 ) { std::unique_lock lock (mu_) ; cv_.wait_for (lock, 100 ms); } else { } } return nullptr ; }
这个用了一个 for
循环去遍历 sync_slave_slots
中每个 slot
的状态,在刚刚的 MetaSync
处理请求结果中,我们将每个 slot
的状态设置成了 kTryDBSync
了,所以这里我们调用的是 SendSlotDBSyncRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
这里调用 GetDBSlotBinlogOffset
将从节点自身的 Binlog
的 filenum
和 offset
记录在 boffset
中,调用 PrepareRsync
,在 dbsync_path_
下先删除了之前的文件夹然后重新建立五种数据类型的文件夹. 然后调用了 SendSlotDBSync
,如果成功的话将 repl_state_
的状态设置为 kWaitReply
否则置为 kError
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Status PikaReplicaManager::SendSlotDBSyncRequest (const std::string& db_name, size_t slot_id) { BinlogOffset boffset; if (!g_pika_server->GetDBSlotBinlogOffset (db_name, slot_id, &boffset)) { LOG (WARNING) << "Slot: " << db_name << ":" << slot_id << ", Get slot binlog offset failed" ; return Status::Corruption ("Slot get binlog offset error" ); } std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!slot) { LOG (WARNING) << "Slot: " << db_name << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slot not found" ); } slot->PrepareRsync (); std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db_name, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_name << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } Status status = pika_repl_client_->SendSlotDBSync (slave_slot->MasterIp (), slave_slot->MasterPort (), db_name, slot_id, boffset, slave_slot->LocalIp ()); if (status.ok ()) { slave_slot->SetReplState (ReplState::kWaitReply); } else { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "SendSlotDBSync failed " << status.ToString (); } return status; }
这里将 InnerMessage
类型设置为 kDBSync
然后将 ip-port
, db_name
, slot_id
传进去,发送给 Master 节点.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Status PikaReplClient::SendSlotDBSync (const std::string& ip, uint32_t port, const std::string& db_name, uint32_t slot_id, const BinlogOffset& boffset, const std::string& local_ip) { InnerMessage::InnerRequest request; request.set_type (InnerMessage::kDBSync); InnerMessage::InnerRequest::DBSync* db_sync = request.mutable_db_sync (); InnerMessage::Node* node = db_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); InnerMessage::Slot* slot = db_sync->mutable_slot (); slot->set_db_name (db_name); slot->set_slot_id (slot_id); InnerMessage::BinlogOffset* binlog_offset = db_sync->mutable_binlog_offset (); binlog_offset->set_filenum (boffset.filenum); binlog_offset->set_offset (boffset.offset); std::string to_send; if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Slot DBSync Request Failed, to Master (" << ip << ":" << port << ")" ; return Status::Corruption ("Serialize Failed" ); } return client_thread_->Write (ip, static_cast <int32_t >(port) + kPortShiftReplServer, to_send); }
从节点发送的 DBSync
请求中,发送了 ip
, port
, db_name
, slot_id
, filenum
, offset
这里主节点将从节点信息写到 syncmasterslot
中的 slave_
中以及 Pika_repl_server 中的 client_conn_map
中,然后调用 DoBgSaveSlot
执行 Bgsave
操作,注意这里调用的端口是目标端口 + 1000
,最后调用 ActivateSlaveDbSync
1 const int kPortShiftRSync = 1000 ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void PikaReplServerConn::HandleDBSyncRequest (void * arg) { ... bool prior_success = true ; std::shared_ptr<SyncMasterSlot> master_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name, slot_id)); if (!master_slot) { ... prior_success = false ; } if (prior_success) { if (!master_slot->CheckSlaveNodeExist (node.ip (), node.port ())) { int32_t session_id = master_slot->GenSessionId (); if (session_id == -1 ) { response.set_code (InnerMessage::kError); ... prior_success = false ; } if (prior_success) { db_sync_response->set_session_id (session_id); Status s = master_slot->AddSlaveNode (node.ip (), node.port (), session_id); if (s.ok ()) { const std::string ip_port = pstd::IpPortString (node.ip (), node.port ()); g_pika_rm->ReplServerUpdateClientConnMap (ip_port, conn->fd ()); ... } else { db_sync_response->set_session_id (-1 ); } } else { int32_t session_id; Status s = master_slot->GetSlaveNodeSession (node.ip (), node.port (), &session_id); if (!s.ok ()) { response.set_code (InnerMessage::kError); ... db_sync_response->set_session_id (-1 ); ... } } g_pika_server->DoBgSaveSlot (node.ip (), node.port () + kPortShiftRSync, db_name, slot_id, static_cast <int32_t >(slave_boffset.filenum ())); ... master_slot->ActivateSlaveDbSync (node.ip (), node.port ()); if (!response.SerializeToString (&reply_str) || (conn->WriteResp (reply_str) != 0 )) { ... conn->NotifyClose (); return ; } conn->NotifyWrite (); }
根据 bgsave_info
的信息判断是否需要重新进行 bgsave
产生 dump
文件, 当 dump
文件夹不存在或者上次进行 bgsave
时的 binlog
已经大于自己且超过阈值(50),那么需要重新进行 bgsave
,否则使用原来的 dump
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void PikaServer::DoBgSaveSlot (const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top) { std::shared_ptr<Slot> slot = GetDBSlotById (db_name, slot_id); if (!slot) { LOG (WARNING) << "can not find Slot whose id is " << slot_id << " in db " << db_name << ", DoBgSaveSlot Failed" ; return ; } std::shared_ptr<SyncMasterSlot> sync_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name, slot_id)); if (!sync_slot) { LOG (WARNING) << "can not find Slot whose id is " << slot_id << " in db " << db_name << ", DoBgSaveSlot Failed" ; return ; } BgSaveInfo bgsave_info = slot->bgsave_info (); std::string logger_filename = sync_slot->Logger ()->filename (); if (pstd::IsDir (bgsave_info.path) != 0 || !pstd::FileExists (NewFileName (logger_filename, bgsave_info.offset.b_offset.filenum)) || top - bgsave_info.offset.b_offset.filenum > kDBSyncMaxGap) { slot->BgSaveSlot (); } }
在 ActivateSlaveDbsync
中将 Slave
节点的状态置为 kSlaveDbSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Status SyncMasterSlot::ActivateSlaveDbSync (const std::string& ip, int port) { std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode (ip, port); if (!slave_ptr) { return Status::NotFound ("ip " + ip + " port " + std::to_string (port)); } slave_ptr->Lock (); slave_ptr->slave_state = kSlaveDbSync; slave_ptr->Unlock (); return Status::OK (); } enum SlaveState { kSlaveNotSync = 0 , kSlaveDbSync = 1 , kSlaveBinlogSync = 2 , };
DBSync总结 在 DBSync
中主节点给从节点的回包信息中写了 session_id
, db_name
, slot_id
,同时主节点根据判断做出是不是要进行 bgsave
从节点收到 Master 节点的 DBSync
回包后,设置一下自己的 session_id
, 然后停止 Rsync
线程,将状态改为 kWaitDBSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void PikaReplClientConn::HandleDBSyncResponse (void * arg) { ... if (response->code () != InnerMessage::kOk) { slave_slot->SetReplState (ReplState::kError); std::string reply = response->has_reply () ? response->reply () : "" ; LOG (WARNING) << "DBSync Failed: " << reply; return ; } slave_slot->SetMasterSessionId (session_id); std::string slot_name = slave_slot->SlotName (); slave_slot->StopRsync (); slave_slot->SetReplState (ReplState::kWaitDBSync); LOG (INFO) << "Slot: " << slot_name << " Need Wait To Sync" ; }
再次回到这个 for 循环,由于我们现在是 kWaitDBSync
状态,所以我们调用 ActivateRsync
启动了 Rsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
在 ActivateRsync
中先调用 Init()
1 2 3 4 5 6 7 8 9 void SyncSlaveSlot::ActivateRsync () { if (!rsync_cli_->IsIdle ()) { return ; } LOG (WARNING) << "ActivateRsync ..." ; if (rsync_cli_->Init ()) { rsync_cli_->Start (); } }
这里使用 port
+ 偏移量(10001) 作为 master_port_
这个 Rsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 bool RsyncClient::Init () { if (state_ != IDLE) { LOG (WARNING) << "State should be IDLE when Init" ; return false ; } master_ip_ = g_pika_server->master_ip (); master_port_ = g_pika_server->master_port () + kPortShiftRsync2; file_set_.clear (); client_thread_->StartThread (); bool ret = Recover (); if (!ret) { LOG (WARNING) << "RsyncClient recover failed" ; client_thread_->StopThread (); return false ; } finished_work_cnt_.store (0 ); LOG (INFO) << "RsyncClient recover success" ; return true ; }
这里的 client_thread
在 Recover
中,我们先调用了 CopyRemoteMeta
函数用于拉取远端 dump
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 bool RsyncClient::Recover () { std::string local_snapshot_uuid; std::string remote_snapshot_uuid; std::set<std::string> local_file_set; std::set<std::string> remote_file_set; std::map<std::string, std::string> local_file_map; Status s = CopyRemoteMeta (&remote_snapshot_uuid, &remote_file_set); ... s = LoadLocalMeta (&local_snapshot_uuid, &local_file_map); ... for (auto const & file : local_file_map) { local_file_set.insert (file.first); } std::set<std::string> expired_files; if (remote_snapshot_uuid != local_snapshot_uuid) { snapshot_uuid_ = remote_snapshot_uuid; file_set_ = remote_file_set; expired_files = local_file_set; } else { std::set<std::string> newly_files; set_difference (remote_file_set.begin (), remote_file_set.end (), local_file_set.begin (), local_file_set.end (), inserter (newly_files, newly_files.begin ())); set_difference (local_file_set.begin (), local_file_set.end (), remote_file_set.begin (), remote_file_set.end (), inserter (expired_files, expired_files.begin ())); file_set_.insert (newly_files.begin (), newly_files.end ()); } s = CleanUpExpiredFiles (local_snapshot_uuid != remote_snapshot_uuid, expired_files); ... s = UpdateLocalMeta (snapshot_uuid_, expired_files, &local_file_map); ... state_ = RUNNING; ... return true ; }
这里封装了一个 RsyncRequest
里面有 db_name
, slot_id
, kRsyncMeta
, reader_index
然后发送给 Master,收到 resp
后将 Master 发给自己的 Dump
文件的元信息记录到 file_set
中,同时更新 snapshot_uuid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 Status RsyncClient::CopyRemoteMeta (std::string* snapshot_uuid, std::set<std::string>* file_set) { Status s; int retries = 0 ; RsyncRequest request; request.set_reader_index (0 ); request.set_db_name (db_name_); request.set_slot_id (slot_id_); request.set_type (kRsyncMeta); std::string to_send; request.SerializeToString (&to_send); while (retries < max_retries_) { WaitObject* wo = wo_mgr_->UpdateWaitObject (0 , "" , kRsyncMeta, kInvalidOffset); s = client_thread_->Write (master_ip_, master_port_, to_send); if (!s.ok ()) { retries++; } std::shared_ptr<RsyncResponse> resp; s = wo->Wait (resp); if (s.IsTimeout () || resp.get () == nullptr ) { LOG (WARNING) << "rsync CopyRemoteMeta request timeout, " << "retry times: " << retries; retries++; continue ; } if (resp->code () != RsyncService::kOk) { continue ; } LOG (INFO) << "receive rsync meta infos, snapshot_uuid: " << resp->snapshot_uuid () << "files count: " << resp->meta_resp ().filenames_size (); for (std::string item : resp->meta_resp ().filenames ()) { file_set->insert (item); } *snapshot_uuid = resp->snapshot_uuid (); break ; } return s; }
从节点发送的 RsyncMeta
请求中包含了 db_name
, slot_id
, reader_index
Master 端处理 MetaRsync
请求,如果当前的 slot
正在做 bgsave
的话会直接返回,如果没有调用 GetDumpMeta
将 Dump
文件中的元信息记录到 filenames
中,然后一并返回给 Slave
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void RsyncServerConn::HandleMetaRsyncRequest (void * arg) { std::unique_ptr<RsyncServerTaskArg> task_arg (static_cast <RsyncServerTaskArg*>(arg)) ; const std::shared_ptr<RsyncService::RsyncRequest> req = task_arg->req; std::shared_ptr<net::PbConn> conn = task_arg->conn; std::string db_name = req->db_name (); uint32_t slot_id = req->slot_id (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!slot || slot->IsBgSaving ()) { LOG (WARNING) << "waiting bgsave done..." ; return ; } RsyncService::RsyncResponse response; response.set_reader_index (req->reader_index ()); response.set_code (RsyncService::kOk); response.set_type (RsyncService::kRsyncMeta); response.set_db_name (db_name); response.set_slot_id (slot_id); std::vector<std::string> filenames; std::string snapshot_uuid; g_pika_server->GetDumpMeta (db_name, slot_id, &filenames, &snapshot_uuid); response.set_snapshot_uuid (snapshot_uuid); ... RsyncService::MetaResponse* meta_resp = response.mutable_meta_resp (); for (const auto & filename : filenames) { meta_resp->add_filenames (filename); } RsyncWriteResp (response, conn); }
在 MetaRsync
请求中,主节点给从节点的回包包括 db_name
, slot_id
, reader_index
, snapshot_uuid
, filename
RsyncFile src/rsync_client.cc
在 file_set_
就开始调用 Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void * RsyncClient::ThreadMain () { ... std::vector<std::set<std::string> > file_vec (GetParallelNum ()); int index = 0 ; for (const auto & file : file_set_) { file_vec[index++ % GetParallelNum ()].insert (file); } for (int i = 0 ; i < GetParallelNum (); i++) { work_threads_[i] = std::move (std::thread (&RsyncClient::Copy, this , file_vec[i], i)); } ... while (state_.load () == RUNNING) { uint64_t elapse = pstd::NowMicros () - start_time; if (elapse < kFlushIntervalUs) { int wait_for_us = kFlushIntervalUs - elapse; std::unique_lock<std::mutex> lock (mu_) ; cond_.wait_for (lock, std::chrono::microseconds (wait_for_us)); } if (state_.load () != RUNNING) { break ; } start_time = pstd::NowMicros (); std::map<std::string, std::string> files_map; { std::lock_guard<std::mutex> guard (mu_) ; files_map.swap (meta_table_); } for (const auto & file : files_map) { meta_rep.append (file.first + ":" + file.second); meta_rep.append ("\n" ); } ... if (finished_work_cnt_.load () == GetParallelNum ()) { break ; } } for (int i = 0 ; i < GetParallelNum (); i++) { work_threads_[i].join (); } finished_work_cnt_.store (0 ); state_.store (STOP); ... }
调用 CopyRemoteFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void RsyncClient::Copy (const std::set<std::string>& file_set, int index) { Status s = Status::OK (); for (const auto & file : file_set) { while (state_.load () == RUNNING) { LOG (INFO) << "copy remote file, filename: " << file; s = CopyRemoteFile (file, index); if (!s.ok ()) { LOG (WARNING) << "copy remote file failed, msg: " << s.ToString (); continue ; } break ; } if (state_.load () != RUNNING) { break ; } } LOG (INFO) << "work_thread index: " << index << " copy remote files done" ; finished_work_cnt_.fetch_add (1 ); cond_.notify_all (); }
这里封装了请求 RsyncRequest
,发给 Master 端 reader_index
, db_name
, slot_id
, filename
, offset
, count
, 同时指定了写文件的路径 dbsync
, 当 Master 端回包的时候,调用 Writer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 Status RsyncClient::CopyRemoteFile (const std::string& filename, int index) { const std::string filepath = dir_ + "/" + filename; std::unique_ptr<RsyncWriter> writer (new RsyncWriter(filepath)) ; ... while (retries < max_retries_) { if (state_.load () != RUNNING) { break ; } size_t copy_file_begin_time = pstd::NowMicros (); size_t count = Throttle::GetInstance ().ThrottledByThroughput (kBytesPerRequest); if (count == 0 ) { std::this_thread::sleep_for (std::chrono::milliseconds (1000 / kThrottleCheckCycle)); continue ; } RsyncRequest request; request.set_reader_index (index); request.set_type (kRsyncFile); request.set_db_name (db_name_); request.set_slot_id (slot_id_); FileRequest* file_req = request.mutable_file_req (); file_req->set_filename (filename); file_req->set_offset (offset); file_req->set_count (count); std::string to_send; request.SerializeToString (&to_send); WaitObject* wo = wo_mgr_->UpdateWaitObject (index, filename, kRsyncFile, offset); s = client_thread_->Write (master_ip_, master_port_, to_send); ... std::shared_ptr<RsyncResponse> resp = nullptr ; s = wo->Wait (resp); ... size_t ret_count = resp->file_resp ().count (); size_t elaspe_time_us = pstd::NowMicros () - copy_file_begin_time; Throttle::GetInstance ().ReturnUnusedThroughput (count, ret_count, elaspe_time_us); ... s = writer->Write ((uint64_t )offset, ret_count, resp->file_resp ().data ().c_str ()); ... offset += resp->file_resp ().count (); if (resp->file_resp ().eof ()) { s = writer->Fsync (); ... break ; } retries = 0 ; } ... }
这里从节点发送的 RsyncFile
请求中包括 db_name
, slot_id
, reader_index
, filename
, offset
, count
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void RsyncServerConn::HandleFileRsyncRequest (void * arg) { ... std::string snapshot_uuid; Status s = g_pika_server->GetDumpUUID (db_name, slot_id, &snapshot_uuid); response.set_snapshot_uuid (snapshot_uuid); if (!s.ok ()) { LOG (WARNING) << "rsyncserver get snapshotUUID failed" ; response.set_code (RsyncService::kErr); RsyncWriteResp (response, conn); return ; } std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!slot) { LOG (WARNING) << "cannot find slot for db_name: " << db_name << " slot_id: " << slot_id; response.set_code (RsyncService::kErr); RsyncWriteResp (response, conn); } const std::string filepath = slot->bgsave_info ().path + "/" + filename; char * buffer = new char [req->file_req ().count () + 1 ]; size_t bytes_read{0 }; std::string checksum = "" ; bool is_eof = false ; std::shared_ptr<RsyncReader> reader = conn->readers_[req->reader_index ()]; s = reader->Read (filepath, offset, count, buffer, &bytes_read, &checksum, &is_eof); if (!s.ok ()) { response.set_code (RsyncService::kErr); RsyncWriteResp (response, conn); delete []buffer; return ; } RsyncService::FileResponse* file_resp = response.mutable_file_resp (); file_resp->set_data (buffer, bytes_read); file_resp->set_eof (is_eof); file_resp->set_checksum (checksum); file_resp->set_filename (filename); file_resp->set_count (bytes_read); file_resp->set_offset (offset); RsyncWriteResp (response, conn); delete []buffer; }
这里为什么 snapshot 不能通过 Pb 传过去,而且复制完之后到从节点这边再去判断
s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str());
RsyncFile 总结 这里 Master 的 FileRsync
请求给从节点值包括 db_name
, slot_id
, read_index
, data
, eof
, checksum
, filename
, bytes_read
, offset
TrySync src/pika_rm.cc
在获取到所有的远端文件之后,Rsync 线程关闭,调用 TryUpdateMasterOffset
更新 offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
这里会更新 binlog
的偏移量,我们会去去读取 info
文件,然后把 info
文件中的 master_port
, filenum
, offset
, term
, index
更新到从节点上,然后对 info
文件进行删除,这里调用 ChaneDb
将 dbsync
文件替换 db
文件夹,然后删除旧的 db
文件夹,然后将 slot
状态设置成 kTryConnect
. 也就是说不管之前从节点的 binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 bool Slot::TryUpdateMasterOffset () { std::string info_path = dbsync_path_ + kBgsaveInfoFile; ... std::ifstream is (info_path) ; ... std::string line; std::string master_ip; ... is.close (); ... pstd::DeleteFile (info_path); if (!ChangeDb (dbsync_path_)) { LOG (WARNING) << "Slot: " << slot_name_ << ", Failed to change db" ; slave_slot->SetReplState (ReplState::kError); return false ; } std::shared_ptr<SyncMasterSlot> master_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name_, slot_id_)); if (!master_slot) { LOG (WARNING) << "Master Slot: " << slot_name_ << " not exist" ; return false ; } master_slot->Logger ()->SetProducerStatus (filenum, offset); slave_slot->SetReplState (ReplState::kTryConnect); return true ; }
由于 slot
状态变更成了 kTryConnect
, 这里调用 SendSlotTrySyncRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status PikaReplicaManager::RunSyncSlaveSlotStateMachine () { std::shared_lock l (slots_rw_) ; for (const auto & item : sync_slave_slots_) { SlotInfo p_info = item.first; std::shared_ptr<SyncSlaveSlot> s_slot = item.second; if (s_slot->State () == ReplState::kTryConnect) { SendSlotTrySyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kTryDBSync) { SendSlotDBSyncRequest (p_info.db_name_, p_info.slot_id_); } else if (s_slot->State () == ReplState::kWaitReply) { continue ; } else if (s_slot->State () == ReplState::kWaitDBSync) { s_slot->ActivateRsync (); std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (p_info.db_name_, p_info.slot_id_); if (slot) { if (!s_slot->IsRsyncRunning ()) { slot->TryUpdateMasterOffset (); } } else { LOG (WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; } } else if (s_slot->State () == ReplState::kConnected || s_slot->State () == ReplState::kNoConnect || s_slot->State () == ReplState::kDBNoConnect) { continue ; } } return Status::OK (); }
这里调用 SendSlotTrySync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Status PikaReplicaManager::SendSlotTrySyncRequest (const std::string& db_name, size_t slot_id) { BinlogOffset boffset; if (!g_pika_server->GetDBSlotBinlogOffset (db_name, slot_id, &boffset)) { LOG (WARNING) << "Slot: " << db_name << ":" << slot_id << ", Get Slot binlog offset failed" ; return Status::Corruption ("Slot get binlog offset error" ); } std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db_name, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_name << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } Status status = pika_repl_client_->SendSlotTrySync (slave_slot->MasterIp (), slave_slot->MasterPort (), db_name, slot_id, boffset, slave_slot->LocalIp ()); if (status.ok ()) { slave_slot->SetReplState (ReplState::kWaitReply); } else { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "SendSlotTrySyncRequest failed " << status.ToString (); } return status; }
在 TrySync
请求中设置了 Binlog_offset
, Binlog_filnum
, ip
, port
, db_name
, slot_id
发送给 Master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Status PikaReplClient::SendSlotTrySync (const std::string& ip, uint32_t port, const std::string& db_name, uint32_t slot_id, const BinlogOffset& boffset, const std::string& local_ip) { InnerMessage::InnerRequest request; request.set_type (InnerMessage::kTrySync); InnerMessage::InnerRequest::TrySync* try_sync = request.mutable_try_sync (); InnerMessage::Node* node = try_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); InnerMessage::Slot* slot = try_sync->mutable_slot (); slot->set_db_name (db_name); slot->set_slot_id (slot_id); InnerMessage::BinlogOffset* binlog_offset = try_sync->mutable_binlog_offset (); binlog_offset->set_filenum (boffset.filenum); binlog_offset->set_offset (boffset.offset); std::string to_send; if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Slot TrySync Request Failed, to Master (" << ip << ":" << port << ")" ; return Status::Corruption ("Serialize Failed" ); } return client_thread_->Write (ip, static_cast <int32_t >(port + kPortShiftReplServer), to_send); }
在 TrySync
中,从节点给主节点发送的信息有 ip
, port
, db_name
, slot_id
, filenum
主节点收到从节点发来的 TrySync
请求后,调用 TrySyncOffsetCheck
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void PikaReplServerConn::HandleTrySyncRequest (void * arg) { ... bool pre_success = true ; ... } else if (pre_success) { pre_success = TrySyncOffsetCheck (slot, try_sync_request, try_sync_response); } if (pre_success) { pre_success = TrySyncUpdateSlaveNode (slot, try_sync_request, conn, try_sync_response); } std::string reply_str; if (!response.SerializeToString (&reply_str) || (conn->WriteResp (reply_str) != 0 )) { LOG (WARNING) << "Handle Try Sync Failed" ; conn->NotifyClose (); return ; } conn->NotifyWrite (); }
在这里主节点将自己的 filenum
和 offset
与从节点的 filenum
和 offset
进行比较,如果从节点数据比主节点新,返回值为 kSyncPointLarger
, 如果从节点传过来的 binlog
在主节点这里已经被删除的情况,返回值为 kSyncPointBePurged
,需要做全量同步,如果从节点的点位和主节点的点位对应不上,则不能做同步,返回值为 kError
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 bool PikaReplServerConn::TrySyncOffsetCheck (const std::shared_ptr<SyncMasterSlot>& slot, const InnerMessage::InnerRequest::TrySync& try_sync_request, InnerMessage::InnerResponse::TrySync* try_sync_response) { const InnerMessage::Node& node = try_sync_request.node (); const InnerMessage::BinlogOffset& slave_boffset = try_sync_request.binlog_offset (); std::string slot_name = slot->SlotName (); BinlogOffset boffset; Status s = slot->Logger ()->GetProducerStatus (&(boffset.filenum), &(boffset.offset)); ... InnerMessage::BinlogOffset* master_slot_boffset = try_sync_response->mutable_binlog_offset (); master_slot_boffset->set_filenum (boffset.filenum); master_slot_boffset->set_offset (boffset.offset); if (boffset.filenum < slave_boffset.filenum () || (boffset.filenum == slave_boffset.filenum () && boffset.offset < slave_boffset.offset ())) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kSyncPointLarger); ... return false ; } std::string confile = NewFileName (slot->Logger ()->filename (), slave_boffset.filenum ()); if (!pstd::FileExists (confile)) { LOG (INFO) << "Slot: " << slot_name << " binlog has been purged, may need full sync" ; try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kSyncPointBePurged); return false ; } PikaBinlogReader reader; reader.Seek (slot->Logger (), slave_boffset.filenum (), slave_boffset.offset ()); BinlogOffset seeked_offset; reader.GetReaderStatus (&(seeked_offset.filenum), &(seeked_offset.offset)); if (seeked_offset.filenum != slave_boffset.filenum () || seeked_offset.offset != slave_boffset.offset ()) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); ... } return true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 bool PikaReplServerConn::TrySyncUpdateSlaveNode (const std::shared_ptr<SyncMasterSlot>& slot, const InnerMessage::InnerRequest::TrySync& try_sync_request, const std::shared_ptr<net::PbConn>& conn, InnerMessage::InnerResponse::TrySync* try_sync_response) { const InnerMessage::Node& node = try_sync_request.node (); std::string slot_name = slot->SlotName (); if (!slot->CheckSlaveNodeExist (node.ip (), node.port ())) { int32_t session_id = slot->GenSessionId (); if (session_id == -1 ) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); LOG (WARNING) << "Slot: " << slot_name << ", Gen Session id Failed" ; return false ; } try_sync_response->set_session_id (session_id); Status s = slot->AddSlaveNode (node.ip (), node.port (), session_id); if (!s.ok ()) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); LOG (WARNING) << "Slot: " << slot_name << " TrySync Failed, " << s.ToString (); return false ; } const std::string ip_port = pstd::IpPortString (node.ip (), node.port ()); g_pika_rm->ReplServerUpdateClientConnMap (ip_port, conn->fd ()); try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kOk); LOG (INFO) << "Slot: " << slot_name << " TrySync Success, Session: " << session_id; } else { int32_t session_id; Status s = slot->GetSlaveNodeSession (node.ip (), node.port (), &session_id); if (!s.ok ()) { try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kError); LOG (WARNING) << "Slot: " << slot_name << ", Get Session id Failed" << s.ToString (); return false ; } try_sync_response->set_reply_code (InnerMessage::InnerResponse::TrySync::kOk); try_sync_response->set_session_id (session_id); LOG (INFO) << "Slot: " << slot_name << " TrySync Success, Session: " << session_id; } return true ; }
TrySync总结 TrySync
中主节点给从节点回包包括 session_id
, db_name
, slot_id
BinlogSync src/pika_repl_client_conn.cc
在 TrySync
回包中根据 Master 的返回值去更新 slave_slot
的状态,如果成功置为 kConnected
,然后发送 SendSlotBinlogSyncAckRequest
,如果返回状态是 kSyncPointBePurged
那就需要重新进行全量复制操作状态流转到 kTryDBSync
,如果返回状态是 kSyncPointLarger
则说明从节点数据比主节点要新,不能继续进行主从复制, 至此全量同步已经完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void PikaReplClientConn::HandleTrySyncResponse (void * arg) { std::unique_ptr<ReplClientTaskArg> task_arg (static_cast <ReplClientTaskArg*>(arg)) ; std::shared_ptr<net::PbConn> conn = task_arg->conn; std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res; ... const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync (); const InnerMessage::Slot& slot_response = try_sync_response.slot (); ... LogicOffset logic_last_offset; ... std::string slot_name = slot->SlotName (); if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kOk) { BinlogOffset boffset; int32_t session_id = try_sync_response.session_id (); slot->Logger ()->GetProducerStatus (&boffset.filenum, &boffset.offset); slave_slot->SetMasterSessionId (session_id); LogOffset offset (boffset, logic_last_offset) ; g_pika_rm->SendSlotBinlogSyncAckRequest (db_name, slot_id, offset, offset, true ); slave_slot->SetReplState (ReplState::kConnected); slave_slot->SetLastRecvTime (pstd::NowMicros ()); LOG (INFO) << "Slot: " << slot_name << " TrySync Ok" ; } else if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kSyncPointBePurged) { slave_slot->SetReplState (ReplState::kTryDBSync); LOG (INFO) << "Slot: " << slot_name << " Need To Try DBSync" ; } else if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kSyncPointLarger) { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "Slot: " << slot_name << " TrySync Error, Because the invalid filenum and offset" ; } else if (try_sync_response.reply_code () == InnerMessage::InnerResponse::TrySync::kError) { slave_slot->SetReplState (ReplState::kError); LOG (WARNING) << "Slot: " << slot_name << " TrySync Error" ; } }
调用 SendSlotBinlogSync
1 2 3 4 5 6 7 8 9 10 11 12 Status PikaReplicaManager::SendSlotBinlogSyncAckRequest (const std::string& db, uint32_t slot_id, const LogOffset& ack_start, const LogOffset& ack_end, bool is_first_send) { std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } return pika_repl_client_->SendSlotBinlogSync (slave_slot->MasterIp (), slave_slot->MasterPort (), db, slot_id, ack_start, ack_end, slave_slot->LocalIp (), is_first_send); }
这里由于从节点是第一次发 BinlogSync
请求所以 BinlogOffset
的 ack_range_start
和 ack_range_end
文件里的 Master
执行 dump 操作的时候的同步点位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Status PikaReplClient::SendSlotBinlogSync (const std::string& ip, uint32_t port, const std::string& db_name, uint32_t slot_id, const LogOffset& ack_start, const LogOffset& ack_end, const std::string& local_ip, bool is_first_send) { InnerMessage::InnerRequest request; request.set_type (InnerMessage::kBinlogSync); InnerMessage::InnerRequest::BinlogSync* binlog_sync = request.mutable_binlog_sync (); InnerMessage::Node* node = binlog_sync->mutable_node (); node->set_ip (local_ip); node->set_port (g_pika_server->port ()); binlog_sync->set_db_name (db_name); binlog_sync->set_slot_id (slot_id); binlog_sync->set_first_send (is_first_send); InnerMessage::BinlogOffset* ack_range_start = binlog_sync->mutable_ack_range_start (); ack_range_start->set_filenum (ack_start.b_offset.filenum); ack_range_start->set_offset (ack_start.b_offset.offset); ack_range_start->set_term (ack_start.l_offset.term); ack_range_start->set_index (ack_start.l_offset.index); InnerMessage::BinlogOffset* ack_range_end = binlog_sync->mutable_ack_range_end (); ack_range_end->set_filenum (ack_end.b_offset.filenum); ack_range_end->set_offset (ack_end.b_offset.offset); ack_range_end->set_term (ack_end.l_offset.term); ack_range_end->set_index (ack_end.l_offset.index); std::shared_ptr<SyncSlaveSlot> slave_slot = g_pika_rm->GetSyncSlaveSlotByName (SlotInfo (db_name, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_name << "_" << slot_id << " not exist" ; return Status::NotFound ("SyncSlaveSlot NotFound" ); } int32_t session_id = slave_slot->MasterSessionId (); binlog_sync->set_session_id (session_id); std::string to_send; if (!request.SerializeToString (&to_send)) { LOG (WARNING) << "Serialize Slot BinlogSync Request Failed, to Master (" << ip << ":" << port << ")" ; return Status::Corruption ("Serialize Failed" ); } return client_thread_->Write (ip, static_cast <int32_t >(port + kPortShiftReplServer), to_send); }
这里的 kBinlogSync
请求中,从节点向主节点传递 ip
, port
, slot_id
, is_first_send
, filenum
, offset
, term
, index
, session_id
, db_name
总结 以上就是全量同步的全部过程,增量同步将放到下篇文章进行讲解~