背景 本篇介绍一下 Pika 的增量同步,Pika 的增量同步依赖 Binlog 机制 ,我将分以下五个部分来讲解:
Master 接收 BinlogSyncRequest 接着上篇的全量同步的文章继续说,上面我们说到从节点在收到主节点的 TrySync 回包之后马上给主节点发送了第一个 kBinlogSync请求,从这里开始就是开始进行增量同步了
src/pika_repl_server_conn.cc
这里由于从节点设置了一个 is_first_send ,这个时候从节点传过来的 ack_range_start 和 ack_range_end 都是一样的,BinlogOffset里面的 offset都是从节点保存的主节点当时 dump 时候的偏移量, 这里调用 ActivateSlaveBinlogSync , 这里传了一个 LogOffset 类型的入参 range_start(LogOffset)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void PikaReplServerConn::HandleBinlogSyncRequest (void * arg) { std::unique_ptr<ReplServerTaskArg> task_arg (static_cast <ReplServerTaskArg*>(arg)) ; const std::shared_ptr<InnerMessage::InnerRequest> req = task_arg->req; std::shared_ptr<net::PbConn> conn = task_arg->conn; if (!req->has_binlog_sync ()) { LOG (WARNING) << "Pb parse error" ; return ; } ... if (is_first_send) { if (range_start.b_offset != range_end.b_offset) { LOG (WARNING) << "first binlogsync request pb argument invalid" ; conn->NotifyClose (); return ; } Status s = master_slot->ActivateSlaveBinlogSync (node.ip (), node.port (), range_start); if (!s.ok ()) { LOG (WARNING) << "Activate Binlog Sync failed " << slave_node.ToString () << " " << s.ToString (); conn->NotifyClose (); return ; } return ; } ... }
FAQ
src/pika_rm.cc
首先我们会获取到指定从节点的 slave_ptr 然后把它的 sent_offset 和 acked_offset 置为入参 offset的值, 然后调用 InitBinlogFileReader 初始化 BinlogReader,传入了一个入参 offset,然后调用 SyncBinlogToWq 把需要写的 Binlog 放到 write_queue中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Status SyncMasterSlot::ActivateSlaveBinlogSync (const std::string& ip, int port, const LogOffset& offset) { std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode (ip, port); if (!slave_ptr) { return Status::NotFound ("ip " + ip + " port " + std::to_string (port)); } { std::lock_guard l (slave_ptr->slave_mu) ; slave_ptr->slave_state = kSlaveBinlogSync; slave_ptr->sent_offset = offset; slave_ptr->acked_offset = offset; Status s = slave_ptr->InitBinlogFileReader (Logger (), offset.b_offset); if (!s.ok ()) { return Status::Corruption ("Init binlog file reader failed" + s.ToString ()); } g_pika_rm->DropItemInWriteQueue (ip, port); slave_ptr->sync_win.Reset (); slave_ptr->b_state = kReadFromFile; } Status s = SyncBinlogToWq (ip, port); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_slave_node.cc
在 InitBinlogFileReader 中构造出一个 binlog_reader 这里会调用 Seek 去更新 binlog_reader中的 filenum 和 offset
1 2 3 4 5 6 7 8 Status SlaveNode::InitBinlogFileReader (const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset) { binlog_reader = std::make_shared <PikaBinlogReader>(); int res = binlog_reader->Seek (binlog, offset.filenum, offset.offset); if (res != 0 ) { return Status::Corruption (ToString () + " binlog reader init failed" ); } return Status::OK (); }
src/pika_binlog_reader.cc
在 Seek 中这里调用 NewFileName 将指定的 Binlog 文件打开,然后将文件 move 给 queue, 初始化了 BinlogReader 中的成员变量偏移量 cur_offset_ 和 last_record_offset_, 这里有个 While 循环去更改 cur_offset, 其中调用了 GetNext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 int PikaBinlogReader::Seek (const std::shared_ptr<Binlog>& logger, uint32_t filenum, uint64_t offset) { std::string confile = NewFileName (logger->filename (), filenum); ... std::unique_ptr<pstd::SequentialFile> readfile; if (!pstd::NewSequentialFile (confile, readfile).ok ()) { LOG (WARNING) << "New swquential " << confile << " failed" ; return -1 ; } ... queue_ = std::move (readfile); logger_ = logger; std::lock_guard l (rwlock_) ; cur_filenum_ = filenum; cur_offset_ = offset; last_record_offset_ = cur_filenum_ % kBlockSize; pstd::Status s; uint64_t start_block = (cur_offset_ / kBlockSize) * kBlockSize; s = queue_->Skip ((cur_offset_ / kBlockSize) * kBlockSize); uint64_t block_offset = cur_offset_ % kBlockSize; uint64_t ret = 0 ; uint64_t res = 0 ; bool is_error = false ; while (true ) { if (res >= block_offset) { cur_offset_ = start_block + res; break ; } ret = 0 ; is_error = GetNext (&ret); if (is_error) { return -1 ; } res += ret; } last_record_offset_ = cur_offset_ % kBlockSize; return 0 ; }
src/pika_binlog_reader.cc
这里将 Binlog 文件重新开始读取,然后每次读完之后,res 会累加偏移量,等偏移量大于等于从节点的偏移量的时候停止,然后记录在 last_record_offset_ 中,last_record_offset_ 记录的是在某个 Block 中的偏移量,cur_offset 中记录的是在一整个文件中的偏移量.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 bool PikaBinlogReader::GetNext (uint64_t * size) { uint64_t offset = 0 ; pstd::Status s; bool is_error = false ; while (true ) { buffer_.clear (); s = queue_->Read (kHeaderSize, &buffer_, backing_store_.get ()); if (!s.ok ()) { is_error = true ; return is_error; } const char * header = buffer_.data (); const uint32_t a = static_cast <uint32_t >(header[0 ]) & 0xff ; const uint32_t b = static_cast <uint32_t >(header[1 ]) & 0xff ; const uint32_t c = static_cast <uint32_t >(header[2 ]) & 0xff ; const unsigned int type = header[7 ]; const uint32_t length = a | (b << 8 ) | (c << 16 ); if (length > (kBlockSize - kHeaderSize)) { return true ; } if (type == kFullType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; break ; } else if (type == kFirstType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; } else if (type == kMiddleType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; } else if (type == kLastType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; break ; } else if (type == kBadRecord) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; break ; } else { is_error = true ; break ; } } *size = offset; return is_error; }
小结
在初始化 BinlogReader 步骤中,我们更新了在 Master 端记录的 SlaveNode 中 BinlogReader 的 logger_, cur_filenum_, cur_offset_ , last_record_offset 的值,使其与从节点传过来的点位值保持一致,方便下一次从更新后点位开始读数据
FAQ
src/pika_rm.cc
更新完偏移量信息后,调用 ReadBinlogFIleToWq 将读取 Binlog 信息到 write_queue 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Status SyncMasterSlot::SyncBinlogToWq (const std::string& ip, int port) { std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode (ip, port); if (!slave_ptr) { return Status::NotFound ("ip " + ip + " port " + std::to_string (port)); } Status s; slave_ptr->Lock (); s = ReadBinlogFileToWq (slave_ptr); slave_ptr->Unlock (); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_rm.cc
首先查看滑动窗口 sync_win中剩余可写的 Binlog 的条数,由于是第一次写数据到滑动窗口,在配置文件中滑动窗口的最大值是 9000,所以我们可以最大一次性可以把 9000 条 Binlog 写到窗口里面,进入 for 循环之后,我们会先判断当前滑动窗口的所有的 Binlog字节大小是不是大于 1 个 G ,如果大于的话,本次就不会再往滑动窗口里面写数据,等待下一次发送,如果不大于就用 reader 调用 Get, 这里的 Status s = reader->Get(&msg, &filenum, &offset); 把 Binlog 中的信息提取到 msg 中,同时更新了 filenum 和 offset. 然后把将信息封装成 SyncWinItem,这里说明一下一个 SyncWinItem 代表了一个完整的 Binlog,封装好了之后往滑动窗口里面 Push, 最后用 WriteTask 进行组装,最终装载到 tasks 中,每次 Push完一个 task 后,Slave_ptr 都会更新一下 sent_offset ,就是这次 Reader 的读取点位,在 WriteTask 中封装了一个 RmNode 和 BinlogChip,所以最终的 tasks 装载的都是发往同一个节点的 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 Status SyncMasterSlot::ReadBinlogFileToWq (const std::shared_ptr<SlaveNode>& slave_ptr) { int cnt = slave_ptr->sync_win.Remaining (); std::shared_ptr<PikaBinlogReader> reader = slave_ptr->binlog_reader; if (!reader) { return Status::OK (); } std::vector<WriteTask> tasks; for (int i = 0 ; i < cnt; ++i) { std::string msg; uint32_t filenum; uint64_t offset; if (slave_ptr->sync_win.GetTotalBinlogSize () > PIKA_MAX_CONN_RBUF_HB * 2 ) { LOG (INFO) << slave_ptr->ToString () << " total binlog size in sync window is :" << slave_ptr->sync_win.GetTotalBinlogSize (); break ; } Status s = reader->Get (&msg, &filenum, &offset); if (s.IsEndFile ()) { break ; } else if (s.IsCorruption () || s.IsIOError ()) { LOG (WARNING) << SyncSlotInfo ().ToString () << " Read Binlog error : " << s.ToString (); return s; } BinlogItem item; if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode (TypeFirst, msg, &item)) { LOG (WARNING) << "Binlog item decode failed" ; return Status::Corruption ("Binlog item decode failed" ); } BinlogOffset sent_b_offset = BinlogOffset (filenum, offset); LogicOffset sent_l_offset = LogicOffset (item.term_id (), item.logic_id ()); LogOffset sent_offset (sent_b_offset, sent_l_offset) ; slave_ptr->sync_win.Push (SyncWinItem (sent_offset, msg.size ())); slave_ptr->SetLastSendTime (pstd::NowMicros ()); RmNode rm_node (slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SlotId(), slave_ptr->SessionId()) ; WriteTask task (rm_node, BinlogChip(sent_offset, msg), slave_ptr->sent_offset) ; tasks.push_back (task); slave_ptr->sent_offset = sent_offset; } if (!tasks.empty ()) { g_pika_rm->ProduceWriteQueue (slave_ptr->Ip (), slave_ptr->Port (), slot_info_.slot_id_, tasks); } return Status::OK (); }
src/pika_binlog_reader.cc
在 Get 函数中,调用 Consume 用来提取 Binlog 中的信息,如果当前的 Binlog 文件已经读到底了,则切换 Binlog 文件,更新 filenum 使其加 1,offset 初始化为 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 Status PikaBinlogReader::Get (std::string* scratch, uint32_t * filenum, uint64_t * offset) { if (!logger_ || !queue_) { return Status::Corruption ("Not seek" ); } scratch->clear (); Status s = Status::OK (); do { if (ReadToTheEnd ()) { return Status::EndFile ("End of cur log file" ); } s = Consume (scratch, filenum, offset); if (s.IsEndFile ()) { std::string confile = NewFileName (logger_->filename (), cur_filenum_ + 1 ); usleep (10000 ); if (pstd::FileExists (confile)) { DLOG (INFO) << "BinlogSender roll to new binlog" << confile; queue_.reset (); queue_ = nullptr ; pstd::NewSequentialFile (confile, queue_); { std::lock_guard l (rwlock_) ; cur_filenum_++; cur_offset_ = 0 ; } last_record_offset_ = 0 ; } else { return Status::IOError ("File Does Not Exists" ); } } else { break ; } } while (s.IsEndFile ()); return Status::OK (); }
src/pika_binlog_reader.cc
在 Consume 中,如果当前记录是 kFullType 则一次性将读到的数据写到 scratch 中,如果是其他的类型,则进行数据的追加或者报异常,这里是个 while 循环去处理,每次取到一个完整的 Binlog 条数就退出循环(一条 Binlog 可能由一条或者多条 Record 组成)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Status PikaBinlogReader::Consume (std::string* scratch, uint32_t * filenum, uint64_t * offset) { Status s; pstd::Slice fragment; while (true ) { const unsigned int record_type = ReadPhysicalRecord (&fragment, filenum, offset); switch (record_type) { case kFullType: *scratch = std::string (fragment.data (), fragment.size ()); s = Status::OK (); break ; case kFirstType: scratch->assign (fragment.data (), fragment.size ()); s = Status::NotFound ("Middle Status" ); break ; case kMiddleType: scratch->append (fragment.data (), fragment.size ()); s = Status::NotFound ("Middle Status" ); break ; case kLastType: scratch->append (fragment.data (), fragment.size ()); s = Status::OK (); break ; case kEof: return Status::EndFile ("Eof" ); case kBadRecord: LOG (WARNING) << "Read BadRecord record, will decode failed, this record may dbsync padded record, not processed here" ; return Status::IOError ("Data Corruption" ); case kOldRecord: return Status::EndFile ("Eof" ); default : return Status::IOError ("Unknow reason" ); } if (s.ok ()) { break ; } } return Status::OK (); }
src/pika_binlog_reader.cc
我们可以看到每个 Slave节点中的 binlogreader 都保存了上次读取到的文件偏移量以及文件名,所以这样的设计解决了每次 Master 需要发生给 slave 哪些信息,首先我们先判断当前读取的这个 Block 剩下的字节数是不是小于 kHreadSize ,如果小于的话,我们则跳过当前这个 block, 从下一个 block 开始,然后调用 Read 函数先取元信息 kHeaderSize 的头部放到 buffer_ 中,然后获取到 length 长度,然后继续往 buffer_ 中追加后面的内容,将结果存在 result 里面,同时更新 last_record_offset 偏移量,然后继续返回上一层的循环中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 unsigned int PikaBinlogReader::ReadPhysicalRecord (pstd::Slice* result, uint32_t * filenum, uint64_t * offset) { pstd::Status s; if (kBlockSize - last_record_offset_ <= kHeaderSize) { queue_->Skip (kBlockSize - last_record_offset_); std::lock_guard l (rwlock_) ; cur_offset_ += (kBlockSize - last_record_offset_); last_record_offset_ = 0 ; } buffer_.clear (); s = queue_->Read (kHeaderSize, &buffer_, backing_store_.get ()); if (s.IsEndFile ()) { return kEof; } else if (!s.ok ()) { return kBadRecord; } const char * header = buffer_.data (); const uint32_t a = static_cast <uint32_t >(header[0 ]) & 0xff ; const uint32_t b = static_cast <uint32_t >(header[1 ]) & 0xff ; const uint32_t c = static_cast <uint32_t >(header[2 ]) & 0xff ; const unsigned int type = header[7 ]; const uint32_t length = a | (b << 8 ) | (c << 16 ); if (length > (kBlockSize - kHeaderSize)) { return kBadRecord; } if (type == kZeroType || length == 0 ) { buffer_.clear (); return kOldRecord; } buffer_.clear (); s = queue_->Read (length, &buffer_, backing_store_.get ()); *result = pstd::Slice (buffer_.data (), buffer_.size ()); last_record_offset_ += kHeaderSize + length; if (s.ok ()) { std::lock_guard l (rwlock_) ; *filenum = cur_filenum_; cur_offset_ += (kHeaderSize + length); *offset = cur_offset_; } return type; }
src/pika_rm.cc
把所有的 WriteTask 写到 write_queues_ 里面待发送,这里的 task 里面包括了每个 Binlog 的偏移量以及数据内容,而且这里的 tasks 是发往同一批 slot 的 Binlog.
1 2 3 4 5 6 7 8 void PikaReplicaManager::ProduceWriteQueue (const std::string& ip, int port, uint32_t slot_id, const std::vector<WriteTask>& tasks) { std::lock_guard l (write_queue_mu_) ; std::string index = ip + ":" + std::to_string (port); for (auto & task : tasks) { write_queues_[index][slot_id].push (task); } }
src/pika_auxiliary_thread.cc
辅助线程中的 SendToPeer 是将上面的 write_queues_ 的数据发送给从节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } pstd::Status s = g_pika_rm->CheckSyncTimeout (pstd::NowMicros ()); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } g_pika_server->CheckLeaderProtectedMode (); s = g_pika_server->TriggerSendBinlogSync (); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } int res = g_pika_server->SendToPeer (); if (res == 0 ) { std::unique_lock lock (mu_) ; cv_.wait_for (lock, 100 ms); } else { } } return nullptr ; }
src/pika_server.cc
调用 ConsumWriterQueue
1 int PikaServer::SendToPeer () { return g_pika_rm->ConsumeWriteQueue (); }
src/pika_rm.cc
这里把发往同一批机器的 Binlog 存放在 to_send_map 中,然后调用 SendSlaveBinlogChips 发送 BinlogSync 回包,这里注意一次发往一个 slot 的 Binlog信息最大不会超过 4000 条, 同时这里的 SendSlaveBinlogChips 是在一个 for 循环里面的,说明主节点在一次 BinlogSync 中可能给从节点回复了多次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 int PikaReplicaManager::ConsumeWriteQueue () { std::unordered_map<std::string, std::vector<std::vector<WriteTask>>> to_send_map; int counter = 0 ; { std::lock_guard l (write_queue_mu_) ; for (auto & iter : write_queues_) { const std::string& ip_port = iter.first; std::unordered_map<uint32_t , std::queue<WriteTask>>& p_map = iter.second; for (auto & slot_queue : p_map) { std::queue<WriteTask>& queue = slot_queue.second; for (int i = 0 ; i < kBinlogSendPacketNum; ++i) { if (queue.empty ()) { break ; } size_t batch_index = queue.size () > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size (); std::vector<WriteTask> to_send; size_t batch_size = 0 ; for (size_t i = 0 ; i < batch_index; ++i) { WriteTask& task = queue.front (); batch_size += task.binlog_chip_.binlog_.size (); ... to_send.push_back (task); queue.pop (); counter++; } if (!to_send.empty ()) { to_send_map[ip_port].push_back (std::move (to_send)); } } } } } std::vector<std::string> to_delete; for (auto & iter : to_send_map) { ... for (auto & to_send : iter.second) { Status s = pika_repl_server_->SendSlaveBinlogChips (ip, port, to_send); if (!s.ok ()) { LOG (WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString (); to_delete.push_back (iter.first); continue ; } } } ... return counter; }
FAQ
这里为什么是发送 binlog 信息失败了就把 write_queues 相应的数据删了
pika_repl_server.cc
这里调用 BuildSyncResp 构造 BinlogSync 请求的回包,这里注意的是如果一个 Proto 回包数据大小大于 256M 的话,则将这里面的数据拆成多个 Proro 发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 pstd::Status PikaReplServer::SendSlaveBinlogChips (const std::string& ip, int port, const std::vector<WriteTask>& tasks) { InnerMessage::InnerResponse response; BuildBinlogSyncResp (tasks, &response); std::string binlog_chip_pb; if (!response.SerializeToString (&binlog_chip_pb)) { return Status::Corruption ("Serialized Failed" ); } if (binlog_chip_pb.size () > static_cast <size_t >(g_pika_conf->max_conn_rbuf_size ())) { for (const auto & task : tasks) { InnerMessage::InnerResponse response; std::vector<WriteTask> tmp_tasks; tmp_tasks.push_back (task); BuildBinlogSyncResp (tmp_tasks, &response); if (!response.SerializeToString (&binlog_chip_pb)) { return Status::Corruption ("Serialized Failed" ); } pstd::Status s = Write (ip, port, binlog_chip_pb); if (!s.ok ()) { return s; } } return pstd::Status::OK (); } return Write (ip, port, binlog_chip_pb); }
src/pika_repl_server.cc
主节点给从节点回了 db_name, slot_id, session_id, boffset,binlog_
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void PikaReplServer::BuildBinlogSyncResp (const std::vector<WriteTask>& tasks, InnerMessage::InnerResponse* response) { response->set_code (InnerMessage::kOk); response->set_type (InnerMessage::Type::kBinlogSync); for (const auto & task : tasks) { InnerMessage::InnerResponse::BinlogSync* binlog_sync = response->add_binlog_sync (); binlog_sync->set_session_id (task.rm_node_.SessionId ()); InnerMessage::Slot* slot = binlog_sync->mutable_slot (); slot->set_db_name (task.rm_node_.DBName ()); slot->set_slot_id (task.rm_node_.SlotId ()); InnerMessage::BinlogOffset* boffset = binlog_sync->mutable_binlog_offset (); BuildBinlogOffset (task.binlog_chip_.offset_, boffset); binlog_sync->set_binlog (task.binlog_chip_.binlog_); } }
BinlogSync 总结 在主节点给从节点的 kBinlogSync 回复中,包括 session_id, db_name, slot_id, binlog_offset, binlog_
Slave 端生产和消费 Binlog src/pika_repl_client_conn.cc
这里的 DispatchBinlogRes 是 Slave 端处理由 Master 端发过来的 Binlog 回包,调用 SchduleWriteBinlogTask 去处理 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void PikaReplClientConn::DispatchBinlogRes (const std::shared_ptr<InnerMessage::InnerResponse>& res) { std::unordered_map<SlotInfo, std::vector<int >*, hash_slot_info> par_binlog; for (int i = 0 ; i < res->binlog_sync_size (); ++i) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync (i); SlotInfo p_info (binlog_res.slot().db_name(), binlog_res.slot().slot_id()) ; if (par_binlog.find (p_info) == par_binlog.end ()) { par_binlog[p_info] = new std::vector <int >(); } par_binlog[p_info]->push_back (i); } std::shared_ptr<SyncSlaveSlot> slave_slot = nullptr ; for (auto & binlog_nums : par_binlog) { RmNode node (binlog_nums.first.db_name_, binlog_nums.first.slot_id_) ; slave_slot = g_pika_rm->GetSyncSlaveSlotByName ( SlotInfo (binlog_nums.first.db_name_, binlog_nums.first.slot_id_)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << binlog_nums.first.db_name_ << "_" << binlog_nums.first.slot_id_ << " not exist" ; break ; } slave_slot->SetLastRecvTime (pstd::NowMicros ()); g_pika_rm->ScheduleWriteBinlogTask (binlog_nums.first.db_name_ + std::to_string (binlog_nums.first.slot_id_), res, std::dynamic_pointer_cast <PikaReplClientConn>(shared_from_this ()), reinterpret_cast <void *>(binlog_nums.second)); } }
src/pika_rm.cc
这里调用了 ScheduleWriteBinlogTask
1 2 3 4 5 void PikaReplicaManager::ScheduleWriteBinlogTask (const std::string& db_slot, const std::shared_ptr<InnerMessage::InnerResponse>& res, const std::shared_ptr<net::PbConn>& conn, void * res_private_data) { pika_repl_client_->ScheduleWriteBinlogTask (db_slot, res, conn, res_private_data); }
src/pika_repl_clint.cc
这里调用了 Schedule 让 worker 线程去异步处理 HandleBGWokerWriteBinlog
1 2 3 4 5 6 7 void PikaReplClient::ScheduleWriteBinlogTask (const std::string& db_slot, const std::shared_ptr<InnerMessage::InnerResponse>& res, const std::shared_ptr<net::PbConn>& conn, void * res_private_data) { size_t index = GetHashIndex (db_slot, true ); auto task_arg = new ReplClientWriteBinlogTaskArg (res, conn, res_private_data, bg_workers_[index].get ()); bg_workers_[index]->Schedule (&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast <void *>(task_arg)); }
src/pika_repl_bgworker.cc
这里将 binlog_res 里面的 binlog().data()进行序列化解析,然后放在 ProcessInputBuffer 里面处理,这里可以看到 HandelBGWorkerWriteBinlog 函数的最后调用了 SendSlotBinlogSyncAckRequest 就是说处理完上一个 Binlog 回包马上又发了一个新的 BinlogSync 请求过去,这里需要注意的是这个请求中的两个参数 ack_start 和 ack_end, 下次传过去时候的 ack_start 就是这次 Binlog 回包时传进来的 ack_start ,但是下次传过去的 ack_end是本次 Binlog 写完之后的偏移量,所以 ack_start 一定是一样的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 void PikaReplBgWorker::HandleBGWorkerWriteBinlog (void * arg) { ... for (size_t i = 0 ; i < index->size (); ++i) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync ((*index)[i]); if (i == 0 ) { db_name = binlog_res.slot ().db_name (); slot_id = binlog_res.slot ().slot_id (); } if (!binlog_res.binlog ().empty ()) { ParseBinlogOffset (binlog_res.binlog_offset (), &pb_begin); break ; } } ... if (pb_begin == LogOffset ()) { only_keepalive = true ; } LogOffset ack_start; if (only_keepalive) { ack_start = LogOffset (); } else { ack_start = pb_begin; } ... for (int i : *index) { ... const char * redis_parser_start = binlog_res.binlog ().data () + BINLOG_ENCODE_LEN; int redis_parser_len = static_cast <int >(binlog_res.binlog ().size ()) - BINLOG_ENCODE_LEN; int processed_len = 0 ; net::RedisParserStatus ret = worker->redis_parser_.ProcessInputBuffer (redis_parser_start, redis_parser_len, &processed_len); if (ret != net::kRedisParserDone) { LOG (WARNING) << "Redis parser failed" ; slave_slot->SetReplState (ReplState::kTryConnect); return ; } } ... LogOffset ack_end; if (only_keepalive) { ack_end = LogOffset (); } else { LogOffset productor_status; std::shared_ptr<Binlog> logger = slot->Logger (); logger->GetProducerStatus (&productor_status.b_offset.filenum, &productor_status.b_offset.offset, &productor_status.l_offset.term, &productor_status.l_offset.index); ack_end = productor_status; ack_end.l_offset.term = pb_end.l_offset.term; } g_pika_rm->SendSlotBinlogSyncAckRequest (db_name, slot_id, ack_start, ack_end); }
src/redis_parser.cc
调用 ProcessRequestBuffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 RedisParserStatus RedisParser::ProcessInputBuffer (const char * input_buf, int length, int * parsed_len) { if (status_code_ == kRedisParserInitDone || status_code_ == kRedisParserHalf || status_code_ == kRedisParserDone) { std::string tmp_str (input_buf, length) ; input_str_ = half_argv_ + tmp_str; input_buf_ = input_str_.c_str (); length_ = static_cast <int32_t >(length + half_argv_.size ()); if (redis_parser_type_ == REDIS_PARSER_REQUEST) { ProcessRequestBuffer (); } else if (redis_parser_type_ == REDIS_PARSER_RESPONSE) { ProcessResponseBuffer (); } else { SetParserStatus (kRedisParserError, kRedisParserInitError); return status_code_; } *parsed_len = cur_pos_; ResetRedisParser (); return status_code_; } SetParserStatus (kRedisParserError, kRedisParserInitError); return status_code_; }
src/redis_parser.cc
这里将解析好的 cmd 放到 argv_ 中,然后触发 DealMessage 调用 HandleWriteBinlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 RedisParserStatus RedisParser::ProcessRequestBuffer () { RedisParserStatus ret; while (cur_pos_ <= length_ - 1 ) { if (redis_type_ == 0 ) { if (input_buf_[cur_pos_] == '*' ) { redis_type_ = REDIS_REQ_MULTIBULK; } else { redis_type_ = REDIS_REQ_INLINE; } } if (redis_type_ == REDIS_REQ_INLINE) { ret = ProcessInlineBuffer (); if (ret != kRedisParserDone) { return ret; } } else if (redis_type_ == REDIS_REQ_MULTIBULK) { ret = ProcessMultibulkBuffer (); if (ret != kRedisParserDone) { return ret; } } else { return kRedisParserError; } if (!argv_.empty ()) { argvs_.push_back (argv_); if (parser_settings_.DealMessage) { if (parser_settings_.DealMessage (this , argv_) != 0 ) { SetParserStatus (kRedisParserError, kRedisParserDealError); return status_code_; } } } argv_.clear (); ResetCommandStatus (); } if (parser_settings_.Complete) { if (parser_settings_.Complete (this , argvs_) != 0 ) { SetParserStatus (kRedisParserError, kRedisParserCompleteError); return status_code_; } } argvs_.clear (); SetParserStatus (kRedisParserDone); return status_code_; }
src/pika_repl_bgworker.cc
这里调用 ConsensusProcessLeaderLog 处理 Binlog
1 2 3 4 5 int PikaReplBgWorker::HandleWriteBinlog (net::RedisParser* parser, const net::RedisCmdArgsType& argv) { ... slot->ConsensusProcessLeaderLog (c_ptr, worker->binlog_item_); return 0 ; }
src/pika_rm.cc
调用 ProposeLeaderLog
1 2 3 Status SyncMasterSlot::ConsensusProcessLeaderLog (const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) { return coordinator_.ProcessLeaderLog (cmd_ptr, attribute); }
src/pika_consensus.cc
调用 InternalAppendLog 先写 Binlog,然后 InternalApplyFollower异步消费 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Status ConsensusCoordinator::ProcessLeaderLog (const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) { LogOffset last_index = mem_logger_->last_offset (); if (attribute.logic_id () < last_index.l_offset.index) { LOG (WARNING) << SlotInfo (db_name_, slot_id_).ToString () << "Drop log from leader logic_id " << attribute.logic_id () << " cur last index " << last_index.l_offset.index; return Status::OK (); } Status s = InternalAppendLog (attribute, cmd_ptr, nullptr , nullptr ); InternalApplyFollower (MemLog::LogItem (LogOffset (), cmd_ptr, nullptr , nullptr )); return Status::OK (); }
写 Binlog src/pika_consensus.cc
调用 InternalAppendBinlog
1 2 3 4 5 6 7 8 9 10 Status ConsensusCoordinator::InternalAppendLog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { LogOffset log_offset; Status s = InternalAppendBinlog (item, cmd_ptr, &log_offset); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_consensus.cc
这里现将命令序列化然后调用 put 函数将 content 写到 Binlog 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status ConsensusCoordinator::InternalAppendBinlog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, LogOffset* log_offset) { std::string content = cmd_ptr->ToRedisProtocol (); Status s = stable_logger_->Logger ()->Put (content); if (!s.ok ()) { std::string db_name = cmd_ptr->db_name ().empty () ? g_pika_conf->default_db () : cmd_ptr->db_name (); std::shared_ptr<DB> db = g_pika_server->GetDB (db_name); if (db) { db->SetBinlogIoError (); } return s; } uint32_t filenum; uint64_t offset; stable_logger_->Logger ()->GetProducerStatus (&filenum, &offset); *log_offset = LogOffset (BinlogOffset (filenum, offset), LogicOffset (item.term_id (), item.logic_id ())); return Status::OK (); }
消费 Binlog src/pika_consensus.cc
调用 SchduleWriteDBTask
1 2 3 void ConsensusCoordinator::InternalApplyFollower (const MemLog::LogItem& log) { g_pika_rm->ScheduleWriteDBTask (log.cmd_ptr, log.offset, db_name_, slot_id_); }
src/pika_rm.cc
调用 ScheduleWriteDBTask
1 2 3 4 void PikaReplicaManager::ScheduleWriteDBTask (const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name, uint32_t slot_id) { pika_repl_client_->ScheduleWriteDBTask (cmd_ptr, offset, db_name, slot_id); }
src/pika_repl_client.cc
调用 Schedule
1 2 3 4 5 6 7 8 void PikaReplClient::ScheduleWriteDBTask (const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name, uint32_t slot_id) { const PikaCmdArgsType& argv = cmd_ptr->argv (); std::string dispatch_key = argv.size () >= 2 ? argv[1 ] : argv[0 ]; size_t index = GetHashIndex (dispatch_key, false ); auto task_arg = new ReplClientWriteDBTaskArg (cmd_ptr, offset, db_name, slot_id); bg_workers_[index]->Schedule (&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast <void *>(task_arg)); }
pika_repl_bgworker.cc
这里是消费 Binlog 的步骤,就是调用 Do 执行命令的流程了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void PikaReplBgWorker::HandleBGWorkerWriteDB (void * arg) { std::unique_ptr<ReplClientWriteDBTaskArg> task_arg (static_cast <ReplClientWriteDBTaskArg*>(arg)) ; const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr; const PikaCmdArgsType& argv = c_ptr->argv (); LogOffset offset = task_arg->offset; std::string db_name = task_arg->db_name; uint32_t slot_id = task_arg->slot_id; uint64_t start_us = 0 ; if (g_pika_conf->slowlog_slower_than () >= 0 ) { start_us = pstd::NowMicros (); } std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!c_ptr->IsSuspend ()) { slot->DbRWLockReader (); } if (c_ptr->IsNeedCacheDo () && PIKA_CACHE_NONE != g_pika_conf->cache_model () && slot->cache ()->CacheStatus () == PIKA_CACHE_STATUS_OK) { if (c_ptr->is_write ()) { c_ptr->DoThroughDB (slot); if (c_ptr->IsNeedUpdateCache ()) { c_ptr->DoUpdateCache (slot); } } else { LOG (WARNING) << "This branch is not impossible reach" ; } } else { c_ptr->Do (slot); } if (!c_ptr->IsSuspend ()) { slot->DbRWUnLock (); } if (g_pika_conf->slowlog_slower_than () >= 0 ) { auto start_time = static_cast <int32_t >(start_us / 1000000 ); auto duration = static_cast <int64_t >(pstd::NowMicros () - start_us); if (duration > g_pika_conf->slowlog_slower_than ()) { g_pika_server->SlowlogPushEntry (argv, start_time, duration); if (g_pika_conf->slowlog_write_errorlog ()) { LOG (ERROR) << "command: " << argv[0 ] << ", start_time(s): " << start_time << ", duration(us): " << duration; } } } }
src/pika_rm.cc
在生产和消费完 Binlog 之后,从节点再次发起 BinlogSync 请求和之前的步骤都是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 Status PikaReplicaManager::SendSlotBinlogSyncAckRequest (const std::string& db, uint32_t slot_id, const LogOffset& ack_start, const LogOffset& ack_end, bool is_first_send) { std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } return pika_repl_client_->SendSlotBinlogSync (slave_slot->MasterIp (), slave_slot->MasterPort (), db, slot_id, ack_start, ack_end, slave_slot->LocalIp (), is_first_send); }
Master 接收 Slave 的 kBinlogSync 请求 src/pika_repl_server_conn.cc
这里调用 UpdateSyncBinlogStatus 更新一下主节点这边记录的 Binlog 读取点位的信息,同时把新一批的 binlog 信息写到 write_queue 队列中
1 2 3 4 5 6 7 8 9 10 11 void PikaReplServerConn::HandleBinlogSyncRequest (void * arg) { ... s = g_pika_rm->UpdateSyncBinlogStatus (slave_node, range_start, range_end); if (!s.ok ()) { LOG (WARNING) << "Update binlog ack failed " << db_name << " " << slot_id << " " << s.ToString (); conn->NotifyClose (); return ; } g_pika_server->SignalAuxiliary (); }
src/pika_rm.cc
调用 ConsensusUpdateSlave 更新点位信息,然后和之前一样调用 SyncBinlogToWq 将新的 Binlog 写到 write_queue 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Status PikaReplicaManager::UpdateSyncBinlogStatus (const RmNode& slave, const LogOffset& offset_start, const LogOffset& offset_end) { std::shared_lock l (slots_rw_) ; if (sync_master_slots_.find (slave.NodeSlotInfo ()) == sync_master_slots_.end ()) { return Status::NotFound (slave.ToString () + " not found" ); } std::shared_ptr<SyncMasterSlot> slot = sync_master_slots_[slave.NodeSlotInfo ()]; Status s = slot->ConsensusUpdateSlave (slave.Ip (), slave.Port (), offset_start, offset_end); if (!s.ok ()) { return s; } s = slot->SyncBinlogToWq (slave.Ip (), slave.Port ()); if (!s.ok ()) { return s; } return Status::OK (); }
FAQ
src/pika_rm.cc
调用 UpdateSlave
1 2 3 4 5 6 7 8 9 Status SyncMasterSlot::ConsensusUpdateSlave (const std::string& ip, int port, const LogOffset& start, const LogOffset& end) { Status s = coordinator_.UpdateSlave (ip, port, start, end); if (!s.ok ()) { LOG (WARNING) << SyncSlotInfo ().ToString () << s.ToString (); return s; } return Status::OK (); }
src/pika_consensus.cc
调用 Update
1 2 3 4 5 6 7 8 9 10 Status ConsensusCoordinator::UpdateSlave (const std::string& ip, int port, const LogOffset& start, const LogOffset& end) { LogOffset committed_index; Status s = sync_pros_.Update (ip, port, start, end, &committed_index); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_consensus.cc
调用 Update
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Status SyncProgress::Update (const std::string& ip, int port, const LogOffset& start, const LogOffset& end, LogOffset* committed_index) { ... LogOffset acked_offset; { std::lock_guard l (slave_ptr->slave_mu) ; Status s = slave_ptr->Update (start, end, &acked_offset); if (!s.ok ()) { return s; } ... } return Status::OK (); }
src/pika_slave_node.cc
这里调用 Update 更新滑动窗口 sync_win 的信息,这里可以看到最终更新了 acked_offset 的值,acked_offset 更新后的值就是上次一批 Binlog 请求最后发送的 sent_offset 的值,这样能确保 acked_offset 和 sent_offset的值保持相等,确保数据不丢失
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status SlaveNode::Update (const LogOffset& start, const LogOffset& end, LogOffset* updated_offset) { if (slave_state != kSlaveBinlogSync) { return Status::Corruption (ToString () + "state not BinlogSync" ); } *updated_offset = LogOffset (); bool res = sync_win.Update (SyncWinItem (start), SyncWinItem (end), updated_offset); if (!res) { return Status::Corruption ("UpdateAckedInfo failed" ); } if (*updated_offset == LogOffset ()) { *updated_offset = acked_offset; return Status::OK (); } acked_offset = *updated_offset; return Status::OK (); }
src/pika_slave_node.cc
这里的 win_ 实际上是一个双端队列,里面存的是 SyncWinItem 类型的信息,传入的 start_item 和 end_itme 就是已经被从消费过的 Binlog 偏移量,然后把这些从 win_ 中 pop_front 出来,使 win_ 中剩下下次需要传递的 Binlog 偏移量的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 bool SyncWindow::Update (const SyncWinItem& start_item, const SyncWinItem& end_item, LogOffset* acked_offset) { size_t start_pos = win_.size (); size_t end_pos = win_.size (); for (size_t i = 0 ; i < win_.size (); ++i) { if (win_[i] == start_item) { start_pos = i; } if (win_[i] == end_item) { end_pos = i; break ; } } if (start_pos == win_.size () || end_pos == win_.size ()) { LOG (WARNING) << "Ack offset Start: " << start_item.ToString () << "End: " << end_item.ToString () << " not found in binlog controller window." << std::endl << "window status " << std::endl << ToStringStatus (); return false ; } for (size_t i = start_pos; i <= end_pos; ++i) { win_[i].acked_ = true ; total_size_ -= win_[i].binlog_size_; } while (!win_.empty ()) { if (win_[0 ].acked_) { *acked_offset = win_[0 ].offset_; win_.pop_front (); } else { break ; } } return true ; }
总结 在增量同步中,Master 先把自己的 Binlog 的偏移量和序列化后的 record 信息发送给 Slave,并记录这个偏移量为 sent_offset ,然后 Slave 端消费完之后回给 Master 一个 ack_start 和 ack_end 去更新 Master 这边的 ack_offset ,只有 sent_offset 和 ack_offset 相同时 Master 才能继续和 Slave 做增量同步
单机 Binlog 生产 src/pika.command.cc
在执行完命令的流程后执行 DoBinlog 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void Cmd::InternalProcessCommand (const std::shared_ptr<Slot>& slot, const std::shared_ptr<SyncMasterSlot>& sync_slot, const HintKeys& hint_keys) { pstd::lock::MultiRecordLock record_lock (slot->LockMgr()) ; if (is_write ()) { record_lock.Lock (current_key ()); } uint64_t start_us = 0 ; if (g_pika_conf->slowlog_slower_than () >= 0 ) { start_us = pstd::NowMicros (); } DoCommand (slot, hint_keys); if (g_pika_conf->slowlog_slower_than () >= 0 ) { do_duration_ += pstd::NowMicros () - start_us; } DoBinlog (sync_slot); if (is_write ()) { record_lock.Unlock (current_key ()); } }
src/pika.command.cc
在 DoBinlog 函数中,首先判断当前的命令是不是写命令,只有写命令才需要记录 Binlog 然后获取到当前的 conn 和 response, 然后执行 SyncMasterSlot 的 ConsensusProposeLog 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void Cmd::DoBinlog (const std::shared_ptr<SyncMasterSlot>& slot) { if (res ().ok () && is_write () && g_pika_conf->write_binlog ()) { std::shared_ptr<net::NetConn> conn_ptr = GetConn (); std::shared_ptr<std::string> resp_ptr = GetResp (); if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { if (!conn_ptr) { LOG (WARNING) << slot->SyncSlotInfo ().ToString () << " conn empty." ; } if (!resp_ptr) { LOG (WARNING) << slot->SyncSlotInfo ().ToString () << " resp empty." ; } res ().SetRes (CmdRes::kErrOther); return ; } Status s = slot->ConsensusProposeLog (shared_from_this (), std::dynamic_pointer_cast <PikaClientConn>(conn_ptr), resp_ptr); if (!s.ok ()) { LOG (WARNING) << slot->SyncSlotInfo ().ToString () << " Writing binlog failed, maybe no space left on device " << s.ToString (); res ().SetRes (CmdRes::kErrOther, s.ToString ()); return ; } } }
src/pika_rm.cc
这里的 coordinator_ 是 ConsensusCoordinator 类,是 SyncSlaveSlot 类的私有成员变量
1 2 3 4 Status SyncMasterSlot::ConsensusProposeLog (const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { return coordinator_.ProposeLog (cmd_ptr, std::move (conn_ptr), std::move (resp_ptr)); }
src/pika_consensus.cc
在 ProposeLog 中,定义了一个 LogOffset 和 BinlogItem 变量,然后调用 InternalAppendLog 去执行写 Binlog, 同时调用 SignalAuxiliary 去唤醒辅助线程去通知主节点同步 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Status ConsensusCoordinator::ProposeLog (const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { std::vector<std::string> keys = cmd_ptr->current_key (); if (cmd_ptr->name () == kCmdNameSAdd && !keys.empty () && (keys[0 ].compare (0 , SlotKeyPrefix.length (), SlotKeyPrefix) == 0 || keys[0 ].compare (0 , SlotTagPrefix.length (), SlotTagPrefix) == 0 )) { return Status::OK (); } LogOffset log_offset; BinlogItem item; Status s = InternalAppendLog (item, cmd_ptr, std::move (conn_ptr), std::move (resp_ptr)); if (!s.ok ()) { return s; } g_pika_server->SignalAuxiliary (); return Status::OK (); } void PikaServer::SignalAuxiliary () { pika_auxiliary_thread_->cv_.notify_one (); }
其中在 BinlogItem 类中有 exec_time, term_id , logic_id, filenum, offset, content, extends 等指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class BinlogItem { public : BinlogItem () = default ; friend class PikaBinlogTransverter ; uint32_t exec_time () const ; uint32_t term_id () const ; uint64_t logic_id () const ; uint32_t filenum () const ; uint64_t offset () const ; std::string content () const ; std::string ToString () const ; private : uint32_t exec_time_ = 0 ; uint32_t term_id_ = 0 ; uint64_t logic_id_ = 0 ; uint32_t filenum_ = 0 ; uint64_t offset_ = 0 ; std::string content_; std::vector<std::string> extends_; };
src/pika_consensus.cc
这里调用 InternalAppendBinlog 方法写 Binlog, 这里传入的是 cmd_ptr , BinlogItem , LogOffset.
1 2 3 4 5 6 7 8 9 10 Status ConsensusCoordinator::InternalAppendLog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { LogOffset log_offset; Status s = InternalAppendBinlog (item, cmd_ptr, &log_offset); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_consensus.cc
这里先把 cmd_ptr 中的命令根据 Redis 协议序列化成 content ,这里的 stable_logger_ 是 StableLog 对象,这里调用 Put 函数将 content 的内容写到文件中,然后调用 GetProducerStatus ,然后调用 LogOffset, 更新 offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status ConsensusCoordinator::InternalAppendBinlog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, LogOffset* log_offset) { std::string content = cmd_ptr->ToRedisProtocol (); Status s = stable_logger_->Logger ()->Put (content); if (!s.ok ()) { std::string db_name = cmd_ptr->db_name ().empty () ? g_pika_conf->default_db () : cmd_ptr->db_name (); std::shared_ptr<DB> db = g_pika_server->GetDB (db_name); if (db) { db->SetBinlogIoError (); } return s; } uint32_t filenum; uint64_t offset; stable_logger_->Logger ()->GetProducerStatus (&filenum, &offset); *log_offset = LogOffset (BinlogOffset (filenum, offset), LogicOffset (item.term_id (), item.logic_id ())); return Status::OK (); }
src/pika_binlog.cc
这里 Binlog 重载了 Put 函数,第一个 Put 先初始化 filenum, term_id , offset, logic_id 变量,调用 GetProduceStatus 获取当前 Binlog 中的最新值(filenum,pro_offset,term_id,logic_id),然后调用 BinlogEncode 组装 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Status Binlog::Put (const std::string& item) { if (!opened_.load ()) { return Status::Busy ("Binlog is not open yet" ); } uint32_t filenum = 0 ; uint32_t term = 0 ; uint64_t offset = 0 ; uint64_t logic_id = 0 ; Lock (); DEFER { Unlock (); }; Status s = GetProducerStatus (&filenum, &offset, &term, &logic_id); if (!s.ok ()) { return s; } logic_id++; std::string data = PikaBinlogTransverter::BinlogEncode (BinlogType::TypeFirst, time (nullptr ), term, logic_id, filenum, offset, item, {}); s = Put (data.c_str (), static_cast <int >(data.size ())); if (!s.ok ()) { binlog_io_error_.store (true ); } return s; }
src/pika_binlog_transverter.cc
这里的 BinlogEncode 来组装 Binlog 的信息,返回一个组装好的 binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 std::string PikaBinlogTransverter::BinlogEncode (BinlogType type, uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset, const std::string& content, const std::vector<std::string>& extends) { std::string binlog; pstd::PutFixed16 (&binlog, type); pstd::PutFixed32 (&binlog, exec_time); pstd::PutFixed32 (&binlog, term_id); pstd::PutFixed64 (&binlog, logic_id); pstd::PutFixed32 (&binlog, filenum); pstd::PutFixed64 (&binlog, offset); uint32_t content_length = content.size (); pstd::PutFixed32 (&binlog, content_length); binlog.append (content); return binlog; }
1 2 | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
这个就是 Binlog 这里的组装,一共 34 字节
src/pika_binlog.cc
我们看下第二个 Put 函数里面做了什么操作,这里如果当前文件大于预设定的文件大小,我们会调用 NewWritableFile 函数将新创建一个 Binlog 文件,并将 Binlog 中的 pro_offset 置为 0, pro_num 文件名索引加一,更新,然后调用 Produce 方法,处理 Binlog 逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Status Binlog::Put (const char * item, int len) { Status s; uint64_t filesize = queue_->Filesize (); if (filesize > file_size_) { std::unique_ptr<pstd::WritableFile> queue; std::string profile = NewFileName (filename_, pro_num_ + 1 ); s = pstd::NewWritableFile (profile, queue); if (!s.ok ()) { LOG (ERROR) << "Binlog: new " << filename_ << " " << s.ToString (); return s; } queue_.reset (); queue_ = std::move (queue); pro_num_++; { std::lock_guard l (version_->rwlock_) ; version_->pro_offset_ = 0 ; version_->pro_num_ = pro_num_; version_->StableSave (); } InitLogFile (); } int pro_offset; s = Produce (pstd::Slice (item, len), &pro_offset); if (s.ok ()) { std::lock_guard l (version_->rwlock_) ; version_->pro_offset_ = pro_offset; version_->logic_id_++; version_->StableSave (); } return s; }
src/pika_binlog.cc
在 Produce 函数中,tmp_pro_offset 是当前 Binlog 文件中的偏移量
我们用 left 记录了当前需要写入的一条 Binlog 的字节大小,用 leftover 记录当前 block 还剩下可填充的字节数,如果当前剩余可用的字节数都小于 kHeaderSize(组成Binlog的头部) 的话,则当前的 Block 后续用 \x00 填充,然后新起一个 Block, 然后将 block_offset_ 置为 0,表示新的 Block 的偏移量,同时 tmp_pro_offset 也进行更新. avail来记录当前除了 kHeadSize 和 block_offset_(当前block已写进的部分) 之后一个 block 中还能记录的字节数,然后用 left 和 avail 作比较,看是否在当前的 block 装载下,如果 left < avail 说明当前的 block可以装载下这一条 record 记录,我们把 type 置为 kFullType 其余的情况我们就置为其他的类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Status Binlog::Produce (const pstd::Slice& item, int * temp_pro_offset) { Status s; const char * ptr = item.data (); size_t left = item.size (); bool begin = true ; *temp_pro_offset = static_cast <int >(version_->pro_offset_); do { const int leftover = static_cast <int >(kBlockSize) - block_offset_; assert (leftover >= 0 ); if (static_cast <size_t >(leftover) < kHeaderSize) { if (leftover > 0 ) { s = queue_->Append (pstd::Slice ("\x00\x00\x00\x00\x00\x00\x00" , leftover)); if (!s.ok ()) { return s; } *temp_pro_offset += leftover; } block_offset_ = 0 ; } const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length = (left < avail) ? left : avail; RecordType type; const bool end = (left == fragment_length); if (begin && end) { type = kFullType; } else if (begin) { type = kFirstType; } else if (end) { type = kLastType; } else { type = kMiddleType; } s = EmitPhysicalRecord (type, ptr, fragment_length, temp_pro_offset); ptr += fragment_length; left -= fragment_length; begin = false ; } while (s.ok () && left > 0 ); return s; }
src/pika_binlog.cc
这里就是把需要写入的东西放到 queue_ 里面,然后下次打开 Binlog 文件的时候就会把新加的部分更新上去,至此一条 Record 记录就落盘了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Status Binlog::EmitPhysicalRecord (RecordType t, const char * ptr, size_t n, int * temp_pro_offset) { Status s; assert (n <= 0xffffff ); assert (block_offset_ + kHeaderSize + n <= kBlockSize); char buf[kHeaderSize]; uint64_t now; struct timeval tv; gettimeofday (&tv, nullptr ); now = tv.tv_sec; buf[0 ] = static_cast <char >(n & 0xff ); buf[1 ] = static_cast <char >((n & 0xff00 ) >> 8 ); buf[2 ] = static_cast <char >(n >> 16 ); buf[3 ] = static_cast <char >(now & 0xff ); buf[4 ] = static_cast <char >((now & 0xff00 ) >> 8 ); buf[5 ] = static_cast <char >((now & 0xff0000 ) >> 16 ); buf[6 ] = static_cast <char >((now & 0xff000000 ) >> 24 ); buf[7 ] = static_cast <char >(t); s = queue_->Append (pstd::Slice (buf, kHeaderSize)); if (s.ok ()) { s = queue_->Append (pstd::Slice (ptr, n)); if (s.ok ()) { s = queue_->Flush (); } } block_offset_ += static_cast <int32_t >(kHeaderSize + n); *temp_pro_offset += static_cast <int32_t >(kHeaderSize + n); return s; }
1 2 | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
所以最终的一条 record 就是以这样的形式记录的
src/pika_binlog.cc
通过以上的代码,大家发现好像这个 Binlog 数据没有落盘,只是在 queue_ (内存)中,这里的 AppendWritableFile 函数就是利用 mmap 进行落盘操作,它在 Binlog 的构造函数中调用,操作系统会定期的将内存中的数据映射到磁盘上来实现落盘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 Binlog::Binlog (std::string binlog_path, const int file_size) : opened_ (false ), binlog_path_ (std::move (binlog_path)), file_size_ (file_size), binlog_io_error_ (false ) { Status s; pstd::CreateDir (binlog_path_); filename_ = binlog_path_ + kBinlogPrefix; const std::string manifest = binlog_path_ + kManifest; std::string profile; if (!pstd::FileExists (manifest)) { LOG (INFO) << "Binlog: Manifest file not exist, we create a new one." ; profile = NewFileName (filename_, pro_num_); s = pstd::NewWritableFile (profile, queue_); if (!s.ok ()) { LOG (FATAL) << "Binlog: new " << filename_ << " " << s.ToString (); } std::unique_ptr<pstd::RWFile> tmp_file; s = pstd::NewRWFile (manifest, tmp_file); versionfile_.reset (tmp_file.release ()); if (!s.ok ()) { LOG (FATAL) << "Binlog: new versionfile error " << s.ToString (); } version_ = std::make_unique <Version>(versionfile_); version_->StableSave (); } else { LOG (INFO) << "Binlog: Find the exist file." ; std::unique_ptr<pstd::RWFile> tmp_file; s = pstd::NewRWFile (manifest, tmp_file); versionfile_.reset (tmp_file.release ()); if (s.ok ()) { version_ = std::make_unique <Version>(versionfile_); version_->Init (); pro_num_ = version_->pro_num_; } else { LOG (FATAL) << "Binlog: open versionfile error" ; } profile = NewFileName (filename_, pro_num_); DLOG (INFO) << "Binlog: open profile " << profile; s = pstd::AppendWritableFile (profile, queue_, version_->pro_offset_); if (!s.ok ()) { LOG (FATAL) << "Binlog: Open file " << profile << " error " << s.ToString (); } uint64_t filesize = queue_->Filesize (); DLOG (INFO) << "Binlog: filesize is " << filesize; } InitLogFile (); }
总结 我们所说的 Binlog 其实就是 Pika 中存在的 write2file 文件,一个 write2file 文件由多个 block 组成,每个 block 大小固定为 64KB, 在 block 中记录的就是一条条 record ,一个 record 就是一条序列化后的 redis 命令(也可以是多条,当然也有一个 Binlog 用多个 record 记录),每个 write2file 文件都有一定的大小,如果当前的 block 剩余字节量还不如填充一条 record 的头部,则当前 block 当前剩余部分的字节用 \x00填充,然后新起一个 block 来装载 record。
FAQ
代码中的 Binlog 头部是 kHeadseSize 是 1+ 3 + 4 也就是 8 字节,但是在 pika_binlog_transverter.h 中对 Binlog 头部的描述是 Type(2B) + exec_time(4B) + term_id(4B) + logic_id(8B) + filenum(4B) + offset(8B) + content_len(4B) 算下来是 34 字节,为什么会不一样?
真正头部的组装是 1(Type) + 3(length) + 4(time) 后面再加 34 字节
1 2 | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
如果是调用 AppendWritableFile 每次进行 Binlog 数据落盘的话,为什么放在 Binlog 的构造函数中
操作系统会定期把内存中的值映射写到磁盘上
如果存在一种情况,当前 Block 剩余部分能存下一条 record 的头部和内容的一部分,那么当前这个 block 应该会存完整个 record 而不是新起一个 Block 存取吗?
会新起一个 Block 存取,如果新起前已经到了文件设置的最大的 Block 数量依然会新起,所以 write2file 文件的大小不是固定的,但是 Block 的大小一定是固定的
主节点写 Binlog 的时候怎么向辅助线程发送信号的?
使用信号量 ,pika_auxiliary_thread_->cv_.notify_one();
问题
在一条 Binlog 记录中,有字段重复的组装
1 2 | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
type 字段一直都是 TypeFirst 这个默认值
辅助线程驱动 BinlogSync src/pika_auxiliary_thread.cc
由于从节点是 BinlogSync 的发起者,但是一段时间没有数据同步之后,从感知不到主上新的数据写入,从而不能再主动再次发起 BinlogSync 流程,这时候需要辅助线程来调用 TriggerSendBinlogSync,驱动 BinlogSync 流程。TriggerSendBinlogSync 的主要逻辑就是将新写入的增量放入 write_queues,再发送给从节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } pstd::Status s = g_pika_rm->CheckSyncTimeout (pstd::NowMicros ()); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } g_pika_server->CheckLeaderProtectedMode (); s = g_pika_server->TriggerSendBinlogSync (); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } int res = g_pika_server->SendToPeer (); if (res == 0 ) { std::unique_lock lock (mu_) ; cv_.wait_for (lock, 100 ms); } else { } } return nullptr ; }
src/pika_server.cc
调用 WakeUpBinlogSync
1 Status PikaServer::TriggerSendBinlogSync () { return g_pika_rm->WakeUpBinlogSync (); }
src/pika_rm.cc
调用 WakeUpSlaveBinlogSync
1 2 3 4 5 6 7 8 9 10 11 Status PikaReplicaManager::WakeUpBinlogSync () { std::shared_lock l (slots_rw_) ; for (auto & iter : sync_master_slots_) { std::shared_ptr<SyncMasterSlot> slot = iter.second; Status s = slot->WakeUpSlaveBinlogSync (); if (!s.ok ()) { return s; } } return Status::OK (); }
src/pika_rm.cc
调用 ReadBinlogFileWq , 这里的步骤和上面的类似,把 Binlog 写到 write_queue中,这里会判断 sent_offset 和 acked_offset 是否保持一致,如果相等说明之前的主动部分的增量同步的数据已经同步完毕,所以 ReadBinlogFileToWq 调用之前主从的增量数据肯定是已经同步了的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Status SyncMasterSlot::WakeUpSlaveBinlogSync () { std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes (); std::vector<std::shared_ptr<SlaveNode>> to_del; for (auto & slave_iter : slaves) { std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second; std::lock_guard l (slave_ptr->slave_mu) ; if (slave_ptr->sent_offset == slave_ptr->acked_offset) { Status s = ReadBinlogFileToWq (slave_ptr); if (!s.ok ()) { to_del.push_back (slave_ptr); LOG (WARNING) << "WakeUpSlaveBinlogSync falied, Delete from RM, slave: " << slave_ptr->ToStringStatus () << " " << s.ToString (); } } } for (auto & to_del_slave : to_del) { RemoveSlaveNode (to_del_slave->Ip (), to_del_slave->Port ()); } return Status::OK (); }
Binlog 的过期策略 src/pika_server.cc
在 Pika 的定时器任务中,有个 AutoPurge 会对 Binlog 做定期的清理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void PikaServer::DoTimingTask () { AutoCompactRange (); AutoPurge (); AutoDeleteExpiredDump (); ResetLastSecQuerynum (); AutoUpdateNetworkMetric (); ProcessCronTask (); UpdateCacheInfo (); PrintThreadPoolQueueStatus (); } void PikaServer::AutoPurge () { DoSameThingEverySlot (TaskType::kPurgeLog); }
src/pika_server.cc
我们会调用 PurgeStableLogs 做 Binlog 删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 Status PikaServer::DoSameThingEverySlot (const TaskType& type) { std::shared_lock rwl (dbs_rw_) ; std::shared_ptr<SyncSlaveSlot> slave_slot = nullptr ; for (const auto & db_item : dbs_) { for (const auto & slot_item : db_item.second->slots_) { switch (type) { case TaskType::kResetReplState: { slave_slot = g_pika_rm->GetSyncSlaveSlotByName (SlotInfo (db_item.second->GetDBName (), slot_item.second->GetSlotID ())); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_item.second->GetDBName () << ":" << slot_item.second->GetSlotID () << " Not Found" ; } slave_slot->SetReplState (ReplState::kNoConnect); break ; } case TaskType::kPurgeLog: { std::shared_ptr<SyncMasterSlot> slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_item.second->GetDBName (), slot_item.second->GetSlotID ())); if (!slot) { LOG (WARNING) << "Slot: " << db_item.second->GetDBName () << ":" << slot_item.second->GetSlotID () << " Not Found." ; break ; } slot->StableLogger ()->PurgeStableLogs (); break ; } case TaskType::kCompactAll: slot_item.second->Compact (storage::kAll); break ; default : break ; } } } return Status::OK (); }
src/pika_stable_log.cc
这里可以看到清理 DoPurgeStableLogs 是异步线程做处理,执行的时候调用 DoPurgeStableLogs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool StableLog::PurgeStableLogs (uint32_t to, bool manual) { bool expect = false ; if (!purging_.compare_exchange_strong (expect, true )) { LOG (WARNING) << "purge process already exist" ; return false ; } auto arg = new PurgeStableLogArg (); arg->to = to; arg->manual = manual; arg->logger = shared_from_this (); g_pika_server->PurgelogsTaskSchedule (&DoPurgeStableLogs, static_cast <void *>(arg)); return true ; }
src/pika_stable_log.cc
调用 PurgeFiles
1 2 3 4 5 void StableLog::DoPurgeStableLogs (void * arg) { std::unique_ptr<PurgeStableLogArg> purge_arg (static_cast <PurgeStableLogArg*>(arg)) ; purge_arg->logger->PurgeFiles (purge_arg->to, purge_arg->manual); purge_arg->logger->ClearPurge (); }
src/pika_stable_log.cc
这里我们先用一个 map 的 binlogs 去获取当前目录下所有的 Binlog 文件名,然后与配置文件中规定的最大 Binlog 数量进行比对(最大是10),然后从最老(就是 filenum 最小的)的 Binlog 文件开始删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 bool StableLog::PurgeFiles (uint32_t to, bool manual) { std::map<uint32_t , std::string> binlogs; if (!GetBinlogFiles (&binlogs)) { LOG (WARNING) << log_path_ << " Could not get binlog files!" ; return false ; } int delete_num = 0 ; struct stat file_stat; auto remain_expire_num = static_cast <int32_t >(binlogs.size () - g_pika_conf->expire_logs_nums ()); std::shared_ptr<SyncMasterSlot> master_slot = nullptr ; std::map<uint32_t , std::string>::iterator it; for (it = binlogs.begin (); it != binlogs.end (); ++it) { if ((manual && it->first <= to) || (remain_expire_num > 0 ) || (binlogs.size () - delete_num > 10 && stat (((log_path_ + it->second)).c_str (), &file_stat) == 0 && file_stat.st_mtime < time (nullptr ) - g_pika_conf->expire_logs_days () * 24 * 3600 )) { master_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name_, slot_id_)); ... if (!master_slot->BinlogCloudPurge (it->first)) { LOG (WARNING) << log_path_ << " Could not purge " << (it->first) << ", since it is already be used" ; return false ; } if (pstd::DeleteFile (log_path_ + it->second)) { ++delete_num; --remain_expire_num; } else { LOG (WARNING) << log_path_ << " Purge log file : " << (it->second) << " failed! error: delete file failed" ; } } else { break ; } } ... return true ; }