背景 本篇介绍一下 Pika 的增量同步,Pika 的增量同步依赖 Binlog 机制 ,我将分以下五个部分来讲解:
Master 接收 BinlogSyncRequest 接着上篇的全量同步的文章继续说,上面我们说到从节点在收到主节点的 TrySync
回包之后马上给主节点发送了第一个 kBinlogSync
请求,从这里开始就是开始进行增量同步了
src/pika_repl_server_conn.cc
这里由于从节点设置了一个 is_first_send
,这个时候从节点传过来的 ack_range_start
和 ack_range_end
都是一样的,BinlogOffset
里面的 offset
都是从节点保存的主节点当时 dump
时候的偏移量, 这里调用 ActivateSlaveBinlogSync
, 这里传了一个 LogOffset
类型的入参 range_start
(LogOffset)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void PikaReplServerConn::HandleBinlogSyncRequest (void * arg) { std::unique_ptr<ReplServerTaskArg> task_arg (static_cast <ReplServerTaskArg*>(arg)) ; const std::shared_ptr<InnerMessage::InnerRequest> req = task_arg->req; std::shared_ptr<net::PbConn> conn = task_arg->conn; if (!req->has_binlog_sync ()) { LOG (WARNING) << "Pb parse error" ; return ; } ... if (is_first_send) { if (range_start.b_offset != range_end.b_offset) { LOG (WARNING) << "first binlogsync request pb argument invalid" ; conn->NotifyClose (); return ; } Status s = master_slot->ActivateSlaveBinlogSync (node.ip (), node.port (), range_start); if (!s.ok ()) { LOG (WARNING) << "Activate Binlog Sync failed " << slave_node.ToString () << " " << s.ToString (); conn->NotifyClose (); return ; } return ; } ... }
FAQ
src/pika_rm.cc
首先我们会获取到指定从节点的 slave_ptr
然后把它的 sent_offset
和 acked_offset
置为入参 offset
的值, 然后调用 InitBinlogFileReader
初始化 BinlogReader
,传入了一个入参 offset
,然后调用 SyncBinlogToWq
把需要写的 Binlog 放到 write_queue
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Status SyncMasterSlot::ActivateSlaveBinlogSync (const std::string& ip, int port, const LogOffset& offset) { std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode (ip, port); if (!slave_ptr) { return Status::NotFound ("ip " + ip + " port " + std::to_string (port)); } { std::lock_guard l (slave_ptr->slave_mu) ; slave_ptr->slave_state = kSlaveBinlogSync; slave_ptr->sent_offset = offset; slave_ptr->acked_offset = offset; Status s = slave_ptr->InitBinlogFileReader (Logger (), offset.b_offset); if (!s.ok ()) { return Status::Corruption ("Init binlog file reader failed" + s.ToString ()); } g_pika_rm->DropItemInWriteQueue (ip, port); slave_ptr->sync_win.Reset (); slave_ptr->b_state = kReadFromFile; } Status s = SyncBinlogToWq (ip, port); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_slave_node.cc
在 InitBinlogFileReader
中构造出一个 binlog_reader
这里会调用 Seek
去更新 binlog_reader
中的 filenum
和 offset
1 2 3 4 5 6 7 8 Status SlaveNode::InitBinlogFileReader (const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset) { binlog_reader = std::make_shared <PikaBinlogReader>(); int res = binlog_reader->Seek (binlog, offset.filenum, offset.offset); if (res != 0 ) { return Status::Corruption (ToString () + " binlog reader init failed" ); } return Status::OK (); }
src/pika_binlog_reader.cc
在 Seek
中这里调用 NewFileName
将指定的 Binlog
文件打开,然后将文件 move
给 queue
, 初始化了 BinlogReader
中的成员变量偏移量 cur_offset_
和 last_record_offset_
, 这里有个 While
循环去更改 cur_offset
, 其中调用了 GetNext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 int PikaBinlogReader::Seek (const std::shared_ptr<Binlog>& logger, uint32_t filenum, uint64_t offset) { std::string confile = NewFileName (logger->filename (), filenum); ... std::unique_ptr<pstd::SequentialFile> readfile; if (!pstd::NewSequentialFile (confile, readfile).ok ()) { LOG (WARNING) << "New swquential " << confile << " failed" ; return -1 ; } ... queue_ = std::move (readfile); logger_ = logger; std::lock_guard l (rwlock_) ; cur_filenum_ = filenum; cur_offset_ = offset; last_record_offset_ = cur_filenum_ % kBlockSize; pstd::Status s; uint64_t start_block = (cur_offset_ / kBlockSize) * kBlockSize; s = queue_->Skip ((cur_offset_ / kBlockSize) * kBlockSize); uint64_t block_offset = cur_offset_ % kBlockSize; uint64_t ret = 0 ; uint64_t res = 0 ; bool is_error = false ; while (true ) { if (res >= block_offset) { cur_offset_ = start_block + res; break ; } ret = 0 ; is_error = GetNext (&ret); if (is_error) { return -1 ; } res += ret; } last_record_offset_ = cur_offset_ % kBlockSize; return 0 ; }
src/pika_binlog_reader.cc
这里将 Binlog 文件重新开始读取,然后每次读完之后,res
会累加偏移量,等偏移量大于等于从节点的偏移量的时候停止,然后记录在 last_record_offset_
中,last_record_offset_
记录的是在某个 Block
中的偏移量,cur_offset
中记录的是在一整个文件中的偏移量.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 bool PikaBinlogReader::GetNext (uint64_t * size) { uint64_t offset = 0 ; pstd::Status s; bool is_error = false ; while (true ) { buffer_.clear (); s = queue_->Read (kHeaderSize, &buffer_, backing_store_.get ()); if (!s.ok ()) { is_error = true ; return is_error; } const char * header = buffer_.data (); const uint32_t a = static_cast <uint32_t >(header[0 ]) & 0xff ; const uint32_t b = static_cast <uint32_t >(header[1 ]) & 0xff ; const uint32_t c = static_cast <uint32_t >(header[2 ]) & 0xff ; const unsigned int type = header[7 ]; const uint32_t length = a | (b << 8 ) | (c << 16 ); if (length > (kBlockSize - kHeaderSize)) { return true ; } if (type == kFullType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; break ; } else if (type == kFirstType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; } else if (type == kMiddleType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; } else if (type == kLastType) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; break ; } else if (type == kBadRecord) { s = queue_->Read (length, &buffer_, backing_store_.get ()); offset += kHeaderSize + length; break ; } else { is_error = true ; break ; } } *size = offset; return is_error; }
小结
在初始化 BinlogReader
步骤中,我们更新了在 Master 端记录的 SlaveNode 中 BinlogReader
的 logger_
, cur_filenum_
, cur_offset_
, last_record_offset
的值,使其与从节点传过来的点位值保持一致,方便下一次从更新后点位开始读数据
FAQ
src/pika_rm.cc
更新完偏移量信息后,调用 ReadBinlogFIleToWq
将读取 Binlog 信息到 write_queue
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Status SyncMasterSlot::SyncBinlogToWq (const std::string& ip, int port) { std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode (ip, port); if (!slave_ptr) { return Status::NotFound ("ip " + ip + " port " + std::to_string (port)); } Status s; slave_ptr->Lock (); s = ReadBinlogFileToWq (slave_ptr); slave_ptr->Unlock (); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_rm.cc
首先查看滑动窗口 sync_win
中剩余可写的 Binlog 的条数,由于是第一次写数据到滑动窗口,在配置文件中滑动窗口的最大值是 9000
,所以我们可以最大一次性可以把 9000
条 Binlog
写到窗口里面,进入 for
循环之后,我们会先判断当前滑动窗口的所有的 Binlog
字节大小是不是大于 1 个 G
,如果大于的话,本次就不会再往滑动窗口里面写数据,等待下一次发送,如果不大于就用 reader
调用 Get
, 这里的 Status s = reader->Get(&msg, &filenum, &offset);
把 Binlog 中的信息提取到 msg
中,同时更新了 filenum
和 offset
. 然后把将信息封装成 SyncWinItem
,这里说明一下一个 SyncWinItem
代表了一个完整的 Binlog
,封装好了之后往滑动窗口里面 Push
, 最后用 WriteTask
进行组装,最终装载到 tasks
中,每次 Push
完一个 task
后,Slave_ptr
都会更新一下 sent_offset
,就是这次 Reader
的读取点位,在 WriteTask
中封装了一个 RmNode
和 BinlogChip
,所以最终的 tasks
装载的都是发往同一个节点的 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 Status SyncMasterSlot::ReadBinlogFileToWq (const std::shared_ptr<SlaveNode>& slave_ptr) { int cnt = slave_ptr->sync_win.Remaining (); std::shared_ptr<PikaBinlogReader> reader = slave_ptr->binlog_reader; if (!reader) { return Status::OK (); } std::vector<WriteTask> tasks; for (int i = 0 ; i < cnt; ++i) { std::string msg; uint32_t filenum; uint64_t offset; if (slave_ptr->sync_win.GetTotalBinlogSize () > PIKA_MAX_CONN_RBUF_HB * 2 ) { LOG (INFO) << slave_ptr->ToString () << " total binlog size in sync window is :" << slave_ptr->sync_win.GetTotalBinlogSize (); break ; } Status s = reader->Get (&msg, &filenum, &offset); if (s.IsEndFile ()) { break ; } else if (s.IsCorruption () || s.IsIOError ()) { LOG (WARNING) << SyncSlotInfo ().ToString () << " Read Binlog error : " << s.ToString (); return s; } BinlogItem item; if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode (TypeFirst, msg, &item)) { LOG (WARNING) << "Binlog item decode failed" ; return Status::Corruption ("Binlog item decode failed" ); } BinlogOffset sent_b_offset = BinlogOffset (filenum, offset); LogicOffset sent_l_offset = LogicOffset (item.term_id (), item.logic_id ()); LogOffset sent_offset (sent_b_offset, sent_l_offset) ; slave_ptr->sync_win.Push (SyncWinItem (sent_offset, msg.size ())); slave_ptr->SetLastSendTime (pstd::NowMicros ()); RmNode rm_node (slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SlotId(), slave_ptr->SessionId()) ; WriteTask task (rm_node, BinlogChip(sent_offset, msg), slave_ptr->sent_offset) ; tasks.push_back (task); slave_ptr->sent_offset = sent_offset; } if (!tasks.empty ()) { g_pika_rm->ProduceWriteQueue (slave_ptr->Ip (), slave_ptr->Port (), slot_info_.slot_id_, tasks); } return Status::OK (); }
src/pika_binlog_reader.cc
在 Get 函数中,调用 Consume
用来提取 Binlog
中的信息,如果当前的 Binlog
文件已经读到底了,则切换 Binlog
文件,更新 filenum
使其加 1,offset
初始化为 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 Status PikaBinlogReader::Get (std::string* scratch, uint32_t * filenum, uint64_t * offset) { if (!logger_ || !queue_) { return Status::Corruption ("Not seek" ); } scratch->clear (); Status s = Status::OK (); do { if (ReadToTheEnd ()) { return Status::EndFile ("End of cur log file" ); } s = Consume (scratch, filenum, offset); if (s.IsEndFile ()) { std::string confile = NewFileName (logger_->filename (), cur_filenum_ + 1 ); usleep (10000 ); if (pstd::FileExists (confile)) { DLOG (INFO) << "BinlogSender roll to new binlog" << confile; queue_.reset (); queue_ = nullptr ; pstd::NewSequentialFile (confile, queue_); { std::lock_guard l (rwlock_) ; cur_filenum_++; cur_offset_ = 0 ; } last_record_offset_ = 0 ; } else { return Status::IOError ("File Does Not Exists" ); } } else { break ; } } while (s.IsEndFile ()); return Status::OK (); }
src/pika_binlog_reader.cc
在 Consume
中,如果当前记录是 kFullType
则一次性将读到的数据写到 scratch
中,如果是其他的类型,则进行数据的追加或者报异常,这里是个 while
循环去处理,每次取到一个完整的 Binlog
条数就退出循环(一条 Binlog
可能由一条或者多条 Record
组成)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Status PikaBinlogReader::Consume (std::string* scratch, uint32_t * filenum, uint64_t * offset) { Status s; pstd::Slice fragment; while (true ) { const unsigned int record_type = ReadPhysicalRecord (&fragment, filenum, offset); switch (record_type) { case kFullType: *scratch = std::string (fragment.data (), fragment.size ()); s = Status::OK (); break ; case kFirstType: scratch->assign (fragment.data (), fragment.size ()); s = Status::NotFound ("Middle Status" ); break ; case kMiddleType: scratch->append (fragment.data (), fragment.size ()); s = Status::NotFound ("Middle Status" ); break ; case kLastType: scratch->append (fragment.data (), fragment.size ()); s = Status::OK (); break ; case kEof: return Status::EndFile ("Eof" ); case kBadRecord: LOG (WARNING) << "Read BadRecord record, will decode failed, this record may dbsync padded record, not processed here" ; return Status::IOError ("Data Corruption" ); case kOldRecord: return Status::EndFile ("Eof" ); default : return Status::IOError ("Unknow reason" ); } if (s.ok ()) { break ; } } return Status::OK (); }
src/pika_binlog_reader.cc
我们可以看到每个 Slave
节点中的 binlogreader
都保存了上次读取到的文件偏移量以及文件名,所以这样的设计解决了每次 Master 需要发生给 slave 哪些信息,首先我们先判断当前读取的这个 Block
剩下的字节数是不是小于 kHreadSize
,如果小于的话,我们则跳过当前这个 block
, 从下一个 block
开始,然后调用 Read
函数先取元信息 kHeaderSize
的头部放到 buffer_
中,然后获取到 length
长度,然后继续往 buffer_
中追加后面的内容,将结果存在 result
里面,同时更新 last_record_offset
偏移量,然后继续返回上一层的循环中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 unsigned int PikaBinlogReader::ReadPhysicalRecord (pstd::Slice* result, uint32_t * filenum, uint64_t * offset) { pstd::Status s; if (kBlockSize - last_record_offset_ <= kHeaderSize) { queue_->Skip (kBlockSize - last_record_offset_); std::lock_guard l (rwlock_) ; cur_offset_ += (kBlockSize - last_record_offset_); last_record_offset_ = 0 ; } buffer_.clear (); s = queue_->Read (kHeaderSize, &buffer_, backing_store_.get ()); if (s.IsEndFile ()) { return kEof; } else if (!s.ok ()) { return kBadRecord; } const char * header = buffer_.data (); const uint32_t a = static_cast <uint32_t >(header[0 ]) & 0xff ; const uint32_t b = static_cast <uint32_t >(header[1 ]) & 0xff ; const uint32_t c = static_cast <uint32_t >(header[2 ]) & 0xff ; const unsigned int type = header[7 ]; const uint32_t length = a | (b << 8 ) | (c << 16 ); if (length > (kBlockSize - kHeaderSize)) { return kBadRecord; } if (type == kZeroType || length == 0 ) { buffer_.clear (); return kOldRecord; } buffer_.clear (); s = queue_->Read (length, &buffer_, backing_store_.get ()); *result = pstd::Slice (buffer_.data (), buffer_.size ()); last_record_offset_ += kHeaderSize + length; if (s.ok ()) { std::lock_guard l (rwlock_) ; *filenum = cur_filenum_; cur_offset_ += (kHeaderSize + length); *offset = cur_offset_; } return type; }
src/pika_rm.cc
把所有的 WriteTask
写到 write_queues_
里面待发送,这里的 task
里面包括了每个 Binlog
的偏移量以及数据内容,而且这里的 tasks
是发往同一批 slot
的 Binlog
.
1 2 3 4 5 6 7 8 void PikaReplicaManager::ProduceWriteQueue (const std::string& ip, int port, uint32_t slot_id, const std::vector<WriteTask>& tasks) { std::lock_guard l (write_queue_mu_) ; std::string index = ip + ":" + std::to_string (port); for (auto & task : tasks) { write_queues_[index][slot_id].push (task); } }
src/pika_auxiliary_thread.cc
辅助线程中的 SendToPeer
是将上面的 write_queues_
的数据发送给从节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } pstd::Status s = g_pika_rm->CheckSyncTimeout (pstd::NowMicros ()); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } g_pika_server->CheckLeaderProtectedMode (); s = g_pika_server->TriggerSendBinlogSync (); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } int res = g_pika_server->SendToPeer (); if (res == 0 ) { std::unique_lock lock (mu_) ; cv_.wait_for (lock, 100 ms); } else { } } return nullptr ; }
src/pika_server.cc
调用 ConsumWriterQueue
1 int PikaServer::SendToPeer () { return g_pika_rm->ConsumeWriteQueue (); }
src/pika_rm.cc
这里把发往同一批机器的 Binlog
存放在 to_send_map
中,然后调用 SendSlaveBinlogChips
发送 BinlogSync
回包,这里注意一次发往一个 slot
的 Binlog
信息最大不会超过 4000
条, 同时这里的 SendSlaveBinlogChips
是在一个 for
循环里面的,说明主节点在一次 BinlogSync
中可能给从节点回复了多次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 int PikaReplicaManager::ConsumeWriteQueue () { std::unordered_map<std::string, std::vector<std::vector<WriteTask>>> to_send_map; int counter = 0 ; { std::lock_guard l (write_queue_mu_) ; for (auto & iter : write_queues_) { const std::string& ip_port = iter.first; std::unordered_map<uint32_t , std::queue<WriteTask>>& p_map = iter.second; for (auto & slot_queue : p_map) { std::queue<WriteTask>& queue = slot_queue.second; for (int i = 0 ; i < kBinlogSendPacketNum; ++i) { if (queue.empty ()) { break ; } size_t batch_index = queue.size () > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size (); std::vector<WriteTask> to_send; size_t batch_size = 0 ; for (size_t i = 0 ; i < batch_index; ++i) { WriteTask& task = queue.front (); batch_size += task.binlog_chip_.binlog_.size (); ... to_send.push_back (task); queue.pop (); counter++; } if (!to_send.empty ()) { to_send_map[ip_port].push_back (std::move (to_send)); } } } } } std::vector<std::string> to_delete; for (auto & iter : to_send_map) { ... for (auto & to_send : iter.second) { Status s = pika_repl_server_->SendSlaveBinlogChips (ip, port, to_send); if (!s.ok ()) { LOG (WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString (); to_delete.push_back (iter.first); continue ; } } } ... return counter; }
FAQ
这里为什么是发送 binlog
信息失败了就把 write_queues
相应的数据删了
pika_repl_server.cc
这里调用 BuildSyncResp
构造 BinlogSync
请求的回包,这里注意的是如果一个 Proto
回包数据大小大于 256M
的话,则将这里面的数据拆成多个 Proro
发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 pstd::Status PikaReplServer::SendSlaveBinlogChips (const std::string& ip, int port, const std::vector<WriteTask>& tasks) { InnerMessage::InnerResponse response; BuildBinlogSyncResp (tasks, &response); std::string binlog_chip_pb; if (!response.SerializeToString (&binlog_chip_pb)) { return Status::Corruption ("Serialized Failed" ); } if (binlog_chip_pb.size () > static_cast <size_t >(g_pika_conf->max_conn_rbuf_size ())) { for (const auto & task : tasks) { InnerMessage::InnerResponse response; std::vector<WriteTask> tmp_tasks; tmp_tasks.push_back (task); BuildBinlogSyncResp (tmp_tasks, &response); if (!response.SerializeToString (&binlog_chip_pb)) { return Status::Corruption ("Serialized Failed" ); } pstd::Status s = Write (ip, port, binlog_chip_pb); if (!s.ok ()) { return s; } } return pstd::Status::OK (); } return Write (ip, port, binlog_chip_pb); }
src/pika_repl_server.cc
主节点给从节点回了 db_name
, slot_id
, session_id
, boffset
,binlog_
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void PikaReplServer::BuildBinlogSyncResp (const std::vector<WriteTask>& tasks, InnerMessage::InnerResponse* response) { response->set_code (InnerMessage::kOk); response->set_type (InnerMessage::Type::kBinlogSync); for (const auto & task : tasks) { InnerMessage::InnerResponse::BinlogSync* binlog_sync = response->add_binlog_sync (); binlog_sync->set_session_id (task.rm_node_.SessionId ()); InnerMessage::Slot* slot = binlog_sync->mutable_slot (); slot->set_db_name (task.rm_node_.DBName ()); slot->set_slot_id (task.rm_node_.SlotId ()); InnerMessage::BinlogOffset* boffset = binlog_sync->mutable_binlog_offset (); BuildBinlogOffset (task.binlog_chip_.offset_, boffset); binlog_sync->set_binlog (task.binlog_chip_.binlog_); } }
BinlogSync 总结 在主节点给从节点的 kBinlogSync
回复中,包括 session_id
, db_name
, slot_id
, binlog_offset
, binlog_
Slave 端生产和消费 Binlog src/pika_repl_client_conn.cc
这里的 DispatchBinlogRes
是 Slave 端处理由 Master 端发过来的 Binlog
回包,调用 SchduleWriteBinlogTask
去处理 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void PikaReplClientConn::DispatchBinlogRes (const std::shared_ptr<InnerMessage::InnerResponse>& res) { std::unordered_map<SlotInfo, std::vector<int >*, hash_slot_info> par_binlog; for (int i = 0 ; i < res->binlog_sync_size (); ++i) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync (i); SlotInfo p_info (binlog_res.slot().db_name(), binlog_res.slot().slot_id()) ; if (par_binlog.find (p_info) == par_binlog.end ()) { par_binlog[p_info] = new std::vector <int >(); } par_binlog[p_info]->push_back (i); } std::shared_ptr<SyncSlaveSlot> slave_slot = nullptr ; for (auto & binlog_nums : par_binlog) { RmNode node (binlog_nums.first.db_name_, binlog_nums.first.slot_id_) ; slave_slot = g_pika_rm->GetSyncSlaveSlotByName ( SlotInfo (binlog_nums.first.db_name_, binlog_nums.first.slot_id_)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << binlog_nums.first.db_name_ << "_" << binlog_nums.first.slot_id_ << " not exist" ; break ; } slave_slot->SetLastRecvTime (pstd::NowMicros ()); g_pika_rm->ScheduleWriteBinlogTask (binlog_nums.first.db_name_ + std::to_string (binlog_nums.first.slot_id_), res, std::dynamic_pointer_cast <PikaReplClientConn>(shared_from_this ()), reinterpret_cast <void *>(binlog_nums.second)); } }
src/pika_rm.cc
这里调用了 ScheduleWriteBinlogTask
1 2 3 4 5 void PikaReplicaManager::ScheduleWriteBinlogTask (const std::string& db_slot, const std::shared_ptr<InnerMessage::InnerResponse>& res, const std::shared_ptr<net::PbConn>& conn, void * res_private_data) { pika_repl_client_->ScheduleWriteBinlogTask (db_slot, res, conn, res_private_data); }
src/pika_repl_clint.cc
这里调用了 Schedule
让 worker
线程去异步处理 HandleBGWokerWriteBinlog
1 2 3 4 5 6 7 void PikaReplClient::ScheduleWriteBinlogTask (const std::string& db_slot, const std::shared_ptr<InnerMessage::InnerResponse>& res, const std::shared_ptr<net::PbConn>& conn, void * res_private_data) { size_t index = GetHashIndex (db_slot, true ); auto task_arg = new ReplClientWriteBinlogTaskArg (res, conn, res_private_data, bg_workers_[index].get ()); bg_workers_[index]->Schedule (&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast <void *>(task_arg)); }
src/pika_repl_bgworker.cc
这里将 binlog_res
里面的 binlog().data()
进行序列化解析,然后放在 ProcessInputBuffer
里面处理,这里可以看到 HandelBGWorkerWriteBinlog
函数的最后调用了 SendSlotBinlogSyncAckRequest
就是说处理完上一个 Binlog
回包马上又发了一个新的 BinlogSync
请求过去,这里需要注意的是这个请求中的两个参数 ack_start
和 ack_end
, 下次传过去时候的 ack_start
就是这次 Binlog
回包时传进来的 ack_start
,但是下次传过去的 ack_end
是本次 Binlog
写完之后的偏移量,所以 ack_start
一定是一样的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 void PikaReplBgWorker::HandleBGWorkerWriteBinlog (void * arg) { ... for (size_t i = 0 ; i < index->size (); ++i) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync ((*index)[i]); if (i == 0 ) { db_name = binlog_res.slot ().db_name (); slot_id = binlog_res.slot ().slot_id (); } if (!binlog_res.binlog ().empty ()) { ParseBinlogOffset (binlog_res.binlog_offset (), &pb_begin); break ; } } ... if (pb_begin == LogOffset ()) { only_keepalive = true ; } LogOffset ack_start; if (only_keepalive) { ack_start = LogOffset (); } else { ack_start = pb_begin; } ... for (int i : *index) { ... const char * redis_parser_start = binlog_res.binlog ().data () + BINLOG_ENCODE_LEN; int redis_parser_len = static_cast <int >(binlog_res.binlog ().size ()) - BINLOG_ENCODE_LEN; int processed_len = 0 ; net::RedisParserStatus ret = worker->redis_parser_.ProcessInputBuffer (redis_parser_start, redis_parser_len, &processed_len); if (ret != net::kRedisParserDone) { LOG (WARNING) << "Redis parser failed" ; slave_slot->SetReplState (ReplState::kTryConnect); return ; } } ... LogOffset ack_end; if (only_keepalive) { ack_end = LogOffset (); } else { LogOffset productor_status; std::shared_ptr<Binlog> logger = slot->Logger (); logger->GetProducerStatus (&productor_status.b_offset.filenum, &productor_status.b_offset.offset, &productor_status.l_offset.term, &productor_status.l_offset.index); ack_end = productor_status; ack_end.l_offset.term = pb_end.l_offset.term; } g_pika_rm->SendSlotBinlogSyncAckRequest (db_name, slot_id, ack_start, ack_end); }
src/redis_parser.cc
调用 ProcessRequestBuffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 RedisParserStatus RedisParser::ProcessInputBuffer (const char * input_buf, int length, int * parsed_len) { if (status_code_ == kRedisParserInitDone || status_code_ == kRedisParserHalf || status_code_ == kRedisParserDone) { std::string tmp_str (input_buf, length) ; input_str_ = half_argv_ + tmp_str; input_buf_ = input_str_.c_str (); length_ = static_cast <int32_t >(length + half_argv_.size ()); if (redis_parser_type_ == REDIS_PARSER_REQUEST) { ProcessRequestBuffer (); } else if (redis_parser_type_ == REDIS_PARSER_RESPONSE) { ProcessResponseBuffer (); } else { SetParserStatus (kRedisParserError, kRedisParserInitError); return status_code_; } *parsed_len = cur_pos_; ResetRedisParser (); return status_code_; } SetParserStatus (kRedisParserError, kRedisParserInitError); return status_code_; }
src/redis_parser.cc
这里将解析好的 cmd
放到 argv_
中,然后触发 DealMessage
调用 HandleWriteBinlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 RedisParserStatus RedisParser::ProcessRequestBuffer () { RedisParserStatus ret; while (cur_pos_ <= length_ - 1 ) { if (redis_type_ == 0 ) { if (input_buf_[cur_pos_] == '*' ) { redis_type_ = REDIS_REQ_MULTIBULK; } else { redis_type_ = REDIS_REQ_INLINE; } } if (redis_type_ == REDIS_REQ_INLINE) { ret = ProcessInlineBuffer (); if (ret != kRedisParserDone) { return ret; } } else if (redis_type_ == REDIS_REQ_MULTIBULK) { ret = ProcessMultibulkBuffer (); if (ret != kRedisParserDone) { return ret; } } else { return kRedisParserError; } if (!argv_.empty ()) { argvs_.push_back (argv_); if (parser_settings_.DealMessage) { if (parser_settings_.DealMessage (this , argv_) != 0 ) { SetParserStatus (kRedisParserError, kRedisParserDealError); return status_code_; } } } argv_.clear (); ResetCommandStatus (); } if (parser_settings_.Complete) { if (parser_settings_.Complete (this , argvs_) != 0 ) { SetParserStatus (kRedisParserError, kRedisParserCompleteError); return status_code_; } } argvs_.clear (); SetParserStatus (kRedisParserDone); return status_code_; }
src/pika_repl_bgworker.cc
这里调用 ConsensusProcessLeaderLog
处理 Binlog
1 2 3 4 5 int PikaReplBgWorker::HandleWriteBinlog (net::RedisParser* parser, const net::RedisCmdArgsType& argv) { ... slot->ConsensusProcessLeaderLog (c_ptr, worker->binlog_item_); return 0 ; }
src/pika_rm.cc
调用 ProposeLeaderLog
1 2 3 Status SyncMasterSlot::ConsensusProcessLeaderLog (const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) { return coordinator_.ProcessLeaderLog (cmd_ptr, attribute); }
src/pika_consensus.cc
调用 InternalAppendLog
先写 Binlog,然后 InternalApplyFollower
异步消费 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Status ConsensusCoordinator::ProcessLeaderLog (const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) { LogOffset last_index = mem_logger_->last_offset (); if (attribute.logic_id () < last_index.l_offset.index) { LOG (WARNING) << SlotInfo (db_name_, slot_id_).ToString () << "Drop log from leader logic_id " << attribute.logic_id () << " cur last index " << last_index.l_offset.index; return Status::OK (); } Status s = InternalAppendLog (attribute, cmd_ptr, nullptr , nullptr ); InternalApplyFollower (MemLog::LogItem (LogOffset (), cmd_ptr, nullptr , nullptr )); return Status::OK (); }
写 Binlog src/pika_consensus.cc
调用 InternalAppendBinlog
1 2 3 4 5 6 7 8 9 10 Status ConsensusCoordinator::InternalAppendLog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { LogOffset log_offset; Status s = InternalAppendBinlog (item, cmd_ptr, &log_offset); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_consensus.cc
这里现将命令序列化然后调用 put
函数将 content
写到 Binlog
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status ConsensusCoordinator::InternalAppendBinlog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, LogOffset* log_offset) { std::string content = cmd_ptr->ToRedisProtocol (); Status s = stable_logger_->Logger ()->Put (content); if (!s.ok ()) { std::string db_name = cmd_ptr->db_name ().empty () ? g_pika_conf->default_db () : cmd_ptr->db_name (); std::shared_ptr<DB> db = g_pika_server->GetDB (db_name); if (db) { db->SetBinlogIoError (); } return s; } uint32_t filenum; uint64_t offset; stable_logger_->Logger ()->GetProducerStatus (&filenum, &offset); *log_offset = LogOffset (BinlogOffset (filenum, offset), LogicOffset (item.term_id (), item.logic_id ())); return Status::OK (); }
消费 Binlog src/pika_consensus.cc
调用 SchduleWriteDBTask
1 2 3 void ConsensusCoordinator::InternalApplyFollower (const MemLog::LogItem& log) { g_pika_rm->ScheduleWriteDBTask (log.cmd_ptr, log.offset, db_name_, slot_id_); }
src/pika_rm.cc
调用 ScheduleWriteDBTask
1 2 3 4 void PikaReplicaManager::ScheduleWriteDBTask (const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name, uint32_t slot_id) { pika_repl_client_->ScheduleWriteDBTask (cmd_ptr, offset, db_name, slot_id); }
src/pika_repl_client.cc
调用 Schedule
1 2 3 4 5 6 7 8 void PikaReplClient::ScheduleWriteDBTask (const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name, uint32_t slot_id) { const PikaCmdArgsType& argv = cmd_ptr->argv (); std::string dispatch_key = argv.size () >= 2 ? argv[1 ] : argv[0 ]; size_t index = GetHashIndex (dispatch_key, false ); auto task_arg = new ReplClientWriteDBTaskArg (cmd_ptr, offset, db_name, slot_id); bg_workers_[index]->Schedule (&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast <void *>(task_arg)); }
pika_repl_bgworker.cc
这里是消费 Binlog 的步骤,就是调用 Do
执行命令的流程了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void PikaReplBgWorker::HandleBGWorkerWriteDB (void * arg) { std::unique_ptr<ReplClientWriteDBTaskArg> task_arg (static_cast <ReplClientWriteDBTaskArg*>(arg)) ; const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr; const PikaCmdArgsType& argv = c_ptr->argv (); LogOffset offset = task_arg->offset; std::string db_name = task_arg->db_name; uint32_t slot_id = task_arg->slot_id; uint64_t start_us = 0 ; if (g_pika_conf->slowlog_slower_than () >= 0 ) { start_us = pstd::NowMicros (); } std::shared_ptr<Slot> slot = g_pika_server->GetDBSlotById (db_name, slot_id); if (!c_ptr->IsSuspend ()) { slot->DbRWLockReader (); } if (c_ptr->IsNeedCacheDo () && PIKA_CACHE_NONE != g_pika_conf->cache_model () && slot->cache ()->CacheStatus () == PIKA_CACHE_STATUS_OK) { if (c_ptr->is_write ()) { c_ptr->DoThroughDB (slot); if (c_ptr->IsNeedUpdateCache ()) { c_ptr->DoUpdateCache (slot); } } else { LOG (WARNING) << "This branch is not impossible reach" ; } } else { c_ptr->Do (slot); } if (!c_ptr->IsSuspend ()) { slot->DbRWUnLock (); } if (g_pika_conf->slowlog_slower_than () >= 0 ) { auto start_time = static_cast <int32_t >(start_us / 1000000 ); auto duration = static_cast <int64_t >(pstd::NowMicros () - start_us); if (duration > g_pika_conf->slowlog_slower_than ()) { g_pika_server->SlowlogPushEntry (argv, start_time, duration); if (g_pika_conf->slowlog_write_errorlog ()) { LOG (ERROR) << "command: " << argv[0 ] << ", start_time(s): " << start_time << ", duration(us): " << duration; } } } }
src/pika_rm.cc
在生产和消费完 Binlog
之后,从节点再次发起 BinlogSync
请求和之前的步骤都是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 Status PikaReplicaManager::SendSlotBinlogSyncAckRequest (const std::string& db, uint32_t slot_id, const LogOffset& ack_start, const LogOffset& ack_end, bool is_first_send) { std::shared_ptr<SyncSlaveSlot> slave_slot = GetSyncSlaveSlotByName (SlotInfo (db, slot_id)); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db << ":" << slot_id << ", NotFound" ; return Status::Corruption ("Slave Slot not found" ); } return pika_repl_client_->SendSlotBinlogSync (slave_slot->MasterIp (), slave_slot->MasterPort (), db, slot_id, ack_start, ack_end, slave_slot->LocalIp (), is_first_send); }
Master 接收 Slave 的 kBinlogSync 请求 src/pika_repl_server_conn.cc
这里调用 UpdateSyncBinlogStatus
更新一下主节点这边记录的 Binlog
读取点位的信息,同时把新一批的 binlog
信息写到 write_queue
队列中
1 2 3 4 5 6 7 8 9 10 11 void PikaReplServerConn::HandleBinlogSyncRequest (void * arg) { ... s = g_pika_rm->UpdateSyncBinlogStatus (slave_node, range_start, range_end); if (!s.ok ()) { LOG (WARNING) << "Update binlog ack failed " << db_name << " " << slot_id << " " << s.ToString (); conn->NotifyClose (); return ; } g_pika_server->SignalAuxiliary (); }
src/pika_rm.cc
调用 ConsensusUpdateSlave
更新点位信息,然后和之前一样调用 SyncBinlogToWq
将新的 Binlog
写到 write_queue
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Status PikaReplicaManager::UpdateSyncBinlogStatus (const RmNode& slave, const LogOffset& offset_start, const LogOffset& offset_end) { std::shared_lock l (slots_rw_) ; if (sync_master_slots_.find (slave.NodeSlotInfo ()) == sync_master_slots_.end ()) { return Status::NotFound (slave.ToString () + " not found" ); } std::shared_ptr<SyncMasterSlot> slot = sync_master_slots_[slave.NodeSlotInfo ()]; Status s = slot->ConsensusUpdateSlave (slave.Ip (), slave.Port (), offset_start, offset_end); if (!s.ok ()) { return s; } s = slot->SyncBinlogToWq (slave.Ip (), slave.Port ()); if (!s.ok ()) { return s; } return Status::OK (); }
FAQ
src/pika_rm.cc
调用 UpdateSlave
1 2 3 4 5 6 7 8 9 Status SyncMasterSlot::ConsensusUpdateSlave (const std::string& ip, int port, const LogOffset& start, const LogOffset& end) { Status s = coordinator_.UpdateSlave (ip, port, start, end); if (!s.ok ()) { LOG (WARNING) << SyncSlotInfo ().ToString () << s.ToString (); return s; } return Status::OK (); }
src/pika_consensus.cc
调用 Update
1 2 3 4 5 6 7 8 9 10 Status ConsensusCoordinator::UpdateSlave (const std::string& ip, int port, const LogOffset& start, const LogOffset& end) { LogOffset committed_index; Status s = sync_pros_.Update (ip, port, start, end, &committed_index); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_consensus.cc
调用 Update
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Status SyncProgress::Update (const std::string& ip, int port, const LogOffset& start, const LogOffset& end, LogOffset* committed_index) { ... LogOffset acked_offset; { std::lock_guard l (slave_ptr->slave_mu) ; Status s = slave_ptr->Update (start, end, &acked_offset); if (!s.ok ()) { return s; } ... } return Status::OK (); }
src/pika_slave_node.cc
这里调用 Update
更新滑动窗口 sync_win
的信息,这里可以看到最终更新了 acked_offset
的值,acked_offset
更新后的值就是上次一批 Binlog
请求最后发送的 sent_offset
的值,这样能确保 acked_offset
和 sent_offset
的值保持相等,确保数据不丢失
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status SlaveNode::Update (const LogOffset& start, const LogOffset& end, LogOffset* updated_offset) { if (slave_state != kSlaveBinlogSync) { return Status::Corruption (ToString () + "state not BinlogSync" ); } *updated_offset = LogOffset (); bool res = sync_win.Update (SyncWinItem (start), SyncWinItem (end), updated_offset); if (!res) { return Status::Corruption ("UpdateAckedInfo failed" ); } if (*updated_offset == LogOffset ()) { *updated_offset = acked_offset; return Status::OK (); } acked_offset = *updated_offset; return Status::OK (); }
src/pika_slave_node.cc
这里的 win_
实际上是一个双端队列,里面存的是 SyncWinItem
类型的信息,传入的 start_item
和 end_itme
就是已经被从消费过的 Binlog
偏移量,然后把这些从 win_
中 pop_front
出来,使 win_
中剩下下次需要传递的 Binlog
偏移量的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 bool SyncWindow::Update (const SyncWinItem& start_item, const SyncWinItem& end_item, LogOffset* acked_offset) { size_t start_pos = win_.size (); size_t end_pos = win_.size (); for (size_t i = 0 ; i < win_.size (); ++i) { if (win_[i] == start_item) { start_pos = i; } if (win_[i] == end_item) { end_pos = i; break ; } } if (start_pos == win_.size () || end_pos == win_.size ()) { LOG (WARNING) << "Ack offset Start: " << start_item.ToString () << "End: " << end_item.ToString () << " not found in binlog controller window." << std::endl << "window status " << std::endl << ToStringStatus (); return false ; } for (size_t i = start_pos; i <= end_pos; ++i) { win_[i].acked_ = true ; total_size_ -= win_[i].binlog_size_; } while (!win_.empty ()) { if (win_[0 ].acked_) { *acked_offset = win_[0 ].offset_; win_.pop_front (); } else { break ; } } return true ; }
总结 在增量同步中,Master 先把自己的 Binlog 的偏移量和序列化后的 record
信息发送给 Slave,并记录这个偏移量为 sent_offset
,然后 Slave 端消费完之后回给 Master 一个 ack_start
和 ack_end
去更新 Master 这边的 ack_offset
,只有 sent_offset
和 ack_offset
相同时 Master 才能继续和 Slave 做增量同步
单机 Binlog 生产 src/pika.command.cc
在执行完命令的流程后执行 DoBinlog
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void Cmd::InternalProcessCommand (const std::shared_ptr<Slot>& slot, const std::shared_ptr<SyncMasterSlot>& sync_slot, const HintKeys& hint_keys) { pstd::lock::MultiRecordLock record_lock (slot->LockMgr()) ; if (is_write ()) { record_lock.Lock (current_key ()); } uint64_t start_us = 0 ; if (g_pika_conf->slowlog_slower_than () >= 0 ) { start_us = pstd::NowMicros (); } DoCommand (slot, hint_keys); if (g_pika_conf->slowlog_slower_than () >= 0 ) { do_duration_ += pstd::NowMicros () - start_us; } DoBinlog (sync_slot); if (is_write ()) { record_lock.Unlock (current_key ()); } }
src/pika.command.cc
在 DoBinlog
函数中,首先判断当前的命令是不是写命令,只有写命令才需要记录 Binlog
然后获取到当前的 conn
和 response
, 然后执行 SyncMasterSlot
的 ConsensusProposeLog
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void Cmd::DoBinlog (const std::shared_ptr<SyncMasterSlot>& slot) { if (res ().ok () && is_write () && g_pika_conf->write_binlog ()) { std::shared_ptr<net::NetConn> conn_ptr = GetConn (); std::shared_ptr<std::string> resp_ptr = GetResp (); if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { if (!conn_ptr) { LOG (WARNING) << slot->SyncSlotInfo ().ToString () << " conn empty." ; } if (!resp_ptr) { LOG (WARNING) << slot->SyncSlotInfo ().ToString () << " resp empty." ; } res ().SetRes (CmdRes::kErrOther); return ; } Status s = slot->ConsensusProposeLog (shared_from_this (), std::dynamic_pointer_cast <PikaClientConn>(conn_ptr), resp_ptr); if (!s.ok ()) { LOG (WARNING) << slot->SyncSlotInfo ().ToString () << " Writing binlog failed, maybe no space left on device " << s.ToString (); res ().SetRes (CmdRes::kErrOther, s.ToString ()); return ; } } }
src/pika_rm.cc
这里的 coordinator_
是 ConsensusCoordinator
类,是 SyncSlaveSlot
类的私有成员变量
1 2 3 4 Status SyncMasterSlot::ConsensusProposeLog (const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { return coordinator_.ProposeLog (cmd_ptr, std::move (conn_ptr), std::move (resp_ptr)); }
src/pika_consensus.cc
在 ProposeLog
中,定义了一个 LogOffset
和 BinlogItem
变量,然后调用 InternalAppendLog
去执行写 Binlog
, 同时调用 SignalAuxiliary
去唤醒辅助线程去通知主节点同步 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Status ConsensusCoordinator::ProposeLog (const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { std::vector<std::string> keys = cmd_ptr->current_key (); if (cmd_ptr->name () == kCmdNameSAdd && !keys.empty () && (keys[0 ].compare (0 , SlotKeyPrefix.length (), SlotKeyPrefix) == 0 || keys[0 ].compare (0 , SlotTagPrefix.length (), SlotTagPrefix) == 0 )) { return Status::OK (); } LogOffset log_offset; BinlogItem item; Status s = InternalAppendLog (item, cmd_ptr, std::move (conn_ptr), std::move (resp_ptr)); if (!s.ok ()) { return s; } g_pika_server->SignalAuxiliary (); return Status::OK (); } void PikaServer::SignalAuxiliary () { pika_auxiliary_thread_->cv_.notify_one (); }
其中在 BinlogItem
类中有 exec_time
, term_id
, logic_id
, filenum
, offset
, content
, extends
等指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class BinlogItem { public : BinlogItem () = default ; friend class PikaBinlogTransverter ; uint32_t exec_time () const ; uint32_t term_id () const ; uint64_t logic_id () const ; uint32_t filenum () const ; uint64_t offset () const ; std::string content () const ; std::string ToString () const ; private : uint32_t exec_time_ = 0 ; uint32_t term_id_ = 0 ; uint64_t logic_id_ = 0 ; uint32_t filenum_ = 0 ; uint64_t offset_ = 0 ; std::string content_; std::vector<std::string> extends_; };
src/pika_consensus.cc
这里调用 InternalAppendBinlog
方法写 Binlog
, 这里传入的是 cmd_ptr
, BinlogItem
, LogOffset
.
1 2 3 4 5 6 7 8 9 10 Status ConsensusCoordinator::InternalAppendLog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr) { LogOffset log_offset; Status s = InternalAppendBinlog (item, cmd_ptr, &log_offset); if (!s.ok ()) { return s; } return Status::OK (); }
src/pika_consensus.cc
这里先把 cmd_ptr
中的命令根据 Redis 协议序列化成 content
,这里的 stable_logger_
是 StableLog
对象,这里调用 Put
函数将 content
的内容写到文件中,然后调用 GetProducerStatus
,然后调用 LogOffset
, 更新 offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Status ConsensusCoordinator::InternalAppendBinlog (const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, LogOffset* log_offset) { std::string content = cmd_ptr->ToRedisProtocol (); Status s = stable_logger_->Logger ()->Put (content); if (!s.ok ()) { std::string db_name = cmd_ptr->db_name ().empty () ? g_pika_conf->default_db () : cmd_ptr->db_name (); std::shared_ptr<DB> db = g_pika_server->GetDB (db_name); if (db) { db->SetBinlogIoError (); } return s; } uint32_t filenum; uint64_t offset; stable_logger_->Logger ()->GetProducerStatus (&filenum, &offset); *log_offset = LogOffset (BinlogOffset (filenum, offset), LogicOffset (item.term_id (), item.logic_id ())); return Status::OK (); }
src/pika_binlog.cc
这里 Binlog
重载了 Put
函数,第一个 Put
先初始化 filenum
, term_id
, offset
, logic_id
变量,调用 GetProduceStatus
获取当前 Binlog 中的最新值(filenum,pro_offset,term_id,logic_id),然后调用 BinlogEncode
组装 Binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Status Binlog::Put (const std::string& item) { if (!opened_.load ()) { return Status::Busy ("Binlog is not open yet" ); } uint32_t filenum = 0 ; uint32_t term = 0 ; uint64_t offset = 0 ; uint64_t logic_id = 0 ; Lock (); DEFER { Unlock (); }; Status s = GetProducerStatus (&filenum, &offset, &term, &logic_id); if (!s.ok ()) { return s; } logic_id++; std::string data = PikaBinlogTransverter::BinlogEncode (BinlogType::TypeFirst, time (nullptr ), term, logic_id, filenum, offset, item, {}); s = Put (data.c_str (), static_cast <int >(data.size ())); if (!s.ok ()) { binlog_io_error_.store (true ); } return s; }
src/pika_binlog_transverter.cc
这里的 BinlogEncode
来组装 Binlog
的信息,返回一个组装好的 binlog
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 std::string PikaBinlogTransverter::BinlogEncode (BinlogType type, uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset, const std::string& content, const std::vector<std::string>& extends) { std::string binlog; pstd::PutFixed16 (&binlog, type); pstd::PutFixed32 (&binlog, exec_time); pstd::PutFixed32 (&binlog, term_id); pstd::PutFixed64 (&binlog, logic_id); pstd::PutFixed32 (&binlog, filenum); pstd::PutFixed64 (&binlog, offset); uint32_t content_length = content.size (); pstd::PutFixed32 (&binlog, content_length); binlog.append (content); return binlog; }
1 2 | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
这个就是 Binlog
这里的组装,一共 34 字节
src/pika_binlog.cc
我们看下第二个 Put 函数里面做了什么操作,这里如果当前文件大于预设定的文件大小,我们会调用 NewWritableFile
函数将新创建一个 Binlog
文件,并将 Binlog 中的 pro_offset
置为 0, pro_num
文件名索引加一,更新,然后调用 Produce
方法,处理 Binlog
逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Status Binlog::Put (const char * item, int len) { Status s; uint64_t filesize = queue_->Filesize (); if (filesize > file_size_) { std::unique_ptr<pstd::WritableFile> queue; std::string profile = NewFileName (filename_, pro_num_ + 1 ); s = pstd::NewWritableFile (profile, queue); if (!s.ok ()) { LOG (ERROR) << "Binlog: new " << filename_ << " " << s.ToString (); return s; } queue_.reset (); queue_ = std::move (queue); pro_num_++; { std::lock_guard l (version_->rwlock_) ; version_->pro_offset_ = 0 ; version_->pro_num_ = pro_num_; version_->StableSave (); } InitLogFile (); } int pro_offset; s = Produce (pstd::Slice (item, len), &pro_offset); if (s.ok ()) { std::lock_guard l (version_->rwlock_) ; version_->pro_offset_ = pro_offset; version_->logic_id_++; version_->StableSave (); } return s; }
src/pika_binlog.cc
在 Produce
函数中,tmp_pro_offset
是当前 Binlog
文件中的偏移量
我们用 left
记录了当前需要写入的一条 Binlog
的字节大小,用 leftover
记录当前 block
还剩下可填充的字节数,如果当前剩余可用的字节数都小于 kHeaderSize(组成Binlog的头部)
的话,则当前的 Block
后续用 \x00
填充,然后新起一个 Block
, 然后将 block_offset_
置为 0,表示新的 Block
的偏移量,同时 tmp_pro_offset
也进行更新. avail
来记录当前除了 kHeadSize
和 block_offset_(当前block已写进的部分)
之后一个 block
中还能记录的字节数,然后用 left
和 avail
作比较,看是否在当前的 block
装载下,如果 left
< avail
说明当前的 block
可以装载下这一条 record
记录,我们把 type
置为 kFullType
其余的情况我们就置为其他的类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Status Binlog::Produce (const pstd::Slice& item, int * temp_pro_offset) { Status s; const char * ptr = item.data (); size_t left = item.size (); bool begin = true ; *temp_pro_offset = static_cast <int >(version_->pro_offset_); do { const int leftover = static_cast <int >(kBlockSize) - block_offset_; assert (leftover >= 0 ); if (static_cast <size_t >(leftover) < kHeaderSize) { if (leftover > 0 ) { s = queue_->Append (pstd::Slice ("\x00\x00\x00\x00\x00\x00\x00" , leftover)); if (!s.ok ()) { return s; } *temp_pro_offset += leftover; } block_offset_ = 0 ; } const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length = (left < avail) ? left : avail; RecordType type; const bool end = (left == fragment_length); if (begin && end) { type = kFullType; } else if (begin) { type = kFirstType; } else if (end) { type = kLastType; } else { type = kMiddleType; } s = EmitPhysicalRecord (type, ptr, fragment_length, temp_pro_offset); ptr += fragment_length; left -= fragment_length; begin = false ; } while (s.ok () && left > 0 ); return s; }
src/pika_binlog.cc
这里就是把需要写入的东西放到 queue_
里面,然后下次打开 Binlog
文件的时候就会把新加的部分更新上去,至此一条 Record
记录就落盘了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Status Binlog::EmitPhysicalRecord (RecordType t, const char * ptr, size_t n, int * temp_pro_offset) { Status s; assert (n <= 0xffffff ); assert (block_offset_ + kHeaderSize + n <= kBlockSize); char buf[kHeaderSize]; uint64_t now; struct timeval tv; gettimeofday (&tv, nullptr ); now = tv.tv_sec; buf[0 ] = static_cast <char >(n & 0xff ); buf[1 ] = static_cast <char >((n & 0xff00 ) >> 8 ); buf[2 ] = static_cast <char >(n >> 16 ); buf[3 ] = static_cast <char >(now & 0xff ); buf[4 ] = static_cast <char >((now & 0xff00 ) >> 8 ); buf[5 ] = static_cast <char >((now & 0xff0000 ) >> 16 ); buf[6 ] = static_cast <char >((now & 0xff000000 ) >> 24 ); buf[7 ] = static_cast <char >(t); s = queue_->Append (pstd::Slice (buf, kHeaderSize)); if (s.ok ()) { s = queue_->Append (pstd::Slice (ptr, n)); if (s.ok ()) { s = queue_->Flush (); } } block_offset_ += static_cast <int32_t >(kHeaderSize + n); *temp_pro_offset += static_cast <int32_t >(kHeaderSize + n); return s; }
1 2 | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
所以最终的一条 record
就是以这样的形式记录的
src/pika_binlog.cc
通过以上的代码,大家发现好像这个 Binlog 数据没有落盘,只是在 queue_
(内存)中,这里的 AppendWritableFile
函数就是利用 mmap
进行落盘操作,它在 Binlog 的构造函数中调用,操作系统会定期的将内存中的数据映射到磁盘上来实现落盘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 Binlog::Binlog (std::string binlog_path, const int file_size) : opened_ (false ), binlog_path_ (std::move (binlog_path)), file_size_ (file_size), binlog_io_error_ (false ) { Status s; pstd::CreateDir (binlog_path_); filename_ = binlog_path_ + kBinlogPrefix; const std::string manifest = binlog_path_ + kManifest; std::string profile; if (!pstd::FileExists (manifest)) { LOG (INFO) << "Binlog: Manifest file not exist, we create a new one." ; profile = NewFileName (filename_, pro_num_); s = pstd::NewWritableFile (profile, queue_); if (!s.ok ()) { LOG (FATAL) << "Binlog: new " << filename_ << " " << s.ToString (); } std::unique_ptr<pstd::RWFile> tmp_file; s = pstd::NewRWFile (manifest, tmp_file); versionfile_.reset (tmp_file.release ()); if (!s.ok ()) { LOG (FATAL) << "Binlog: new versionfile error " << s.ToString (); } version_ = std::make_unique <Version>(versionfile_); version_->StableSave (); } else { LOG (INFO) << "Binlog: Find the exist file." ; std::unique_ptr<pstd::RWFile> tmp_file; s = pstd::NewRWFile (manifest, tmp_file); versionfile_.reset (tmp_file.release ()); if (s.ok ()) { version_ = std::make_unique <Version>(versionfile_); version_->Init (); pro_num_ = version_->pro_num_; } else { LOG (FATAL) << "Binlog: open versionfile error" ; } profile = NewFileName (filename_, pro_num_); DLOG (INFO) << "Binlog: open profile " << profile; s = pstd::AppendWritableFile (profile, queue_, version_->pro_offset_); if (!s.ok ()) { LOG (FATAL) << "Binlog: Open file " << profile << " error " << s.ToString (); } uint64_t filesize = queue_->Filesize (); DLOG (INFO) << "Binlog: filesize is " << filesize; } InitLogFile (); }
总结 我们所说的 Binlog 其实就是 Pika 中存在的 write2file
文件,一个 write2file
文件由多个 block
组成,每个 block
大小固定为 64KB
, 在 block
中记录的就是一条条 record
,一个 record
就是一条序列化后的 redis
命令(也可以是多条,当然也有一个 Binlog 用多个 record 记录),每个 write2file
文件都有一定的大小,如果当前的 block
剩余字节量还不如填充一条 record
的头部,则当前 block
当前剩余部分的字节用 \x00
填充,然后新起一个 block
来装载 record
。
FAQ
代码中的 Binlog 头部是 kHeadseSize 是 1+ 3 + 4 也就是 8 字节,但是在 pika_binlog_transverter.h
中对 Binlog 头部的描述是 Type(2B)
+ exec_time(4B)
+ term_id(4B)
+ logic_id(8B)
+ filenum(4B)
+ offset(8B)
+ content_len(4B)
算下来是 34 字节,为什么会不一样?
真正头部的组装是 1(Type) + 3(length) + 4(time) 后面再加 34 字节
1 2 | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
如果是调用 AppendWritableFile
每次进行 Binlog 数据落盘的话,为什么放在 Binlog
的构造函数中
操作系统会定期把内存中的值映射写到磁盘上
如果存在一种情况,当前 Block
剩余部分能存下一条 record
的头部和内容的一部分,那么当前这个 block
应该会存完整个 record
而不是新起一个 Block
存取吗?
会新起一个 Block 存取,如果新起前已经到了文件设置的最大的 Block 数量依然会新起,所以 write2file 文件的大小不是固定的,但是 Block 的大小一定是固定的
主节点写 Binlog 的时候怎么向辅助线程发送信号的?
使用信号量 ,pika_auxiliary_thread_->cv_.notify_one();
问题
在一条 Binlog
记录中,有字段重复的组装
1 2 | length | time | Type | type | exec_time | term_id | logic_id | filenum | offset | content_len | content | | 3B | 4B | 1B | 2B | 4B | 4B | 8B | 4B | 8B | 4B | |
type
字段一直都是 TypeFirst
这个默认值
辅助线程驱动 BinlogSync src/pika_auxiliary_thread.cc
由于从节点是 BinlogSync
的发起者,但是一段时间没有数据同步之后,从感知不到主上新的数据写入,从而不能再主动再次发起 BinlogSync
流程,这时候需要辅助线程来调用 TriggerSendBinlogSync
,驱动 BinlogSync
流程。TriggerSendBinlogSync
的主要逻辑就是将新写入的增量放入 write_queues
,再发送给从节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void * PikaAuxiliaryThread::ThreadMain () { while (!should_stop ()) { if (g_pika_server->ShouldMetaSync ()) { g_pika_rm->SendMetaSyncRequest (); } else if (g_pika_server->MetaSyncDone ()) { g_pika_rm->RunSyncSlaveSlotStateMachine (); } pstd::Status s = g_pika_rm->CheckSyncTimeout (pstd::NowMicros ()); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } g_pika_server->CheckLeaderProtectedMode (); s = g_pika_server->TriggerSendBinlogSync (); if (!s.ok ()) { LOG (WARNING) << s.ToString (); } int res = g_pika_server->SendToPeer (); if (res == 0 ) { std::unique_lock lock (mu_) ; cv_.wait_for (lock, 100 ms); } else { } } return nullptr ; }
src/pika_server.cc
调用 WakeUpBinlogSync
1 Status PikaServer::TriggerSendBinlogSync () { return g_pika_rm->WakeUpBinlogSync (); }
src/pika_rm.cc
调用 WakeUpSlaveBinlogSync
1 2 3 4 5 6 7 8 9 10 11 Status PikaReplicaManager::WakeUpBinlogSync () { std::shared_lock l (slots_rw_) ; for (auto & iter : sync_master_slots_) { std::shared_ptr<SyncMasterSlot> slot = iter.second; Status s = slot->WakeUpSlaveBinlogSync (); if (!s.ok ()) { return s; } } return Status::OK (); }
src/pika_rm.cc
调用 ReadBinlogFileWq
, 这里的步骤和上面的类似,把 Binlog
写到 write_queue
中,这里会判断 sent_offset
和 acked_offset
是否保持一致,如果相等说明之前的主动部分的增量同步的数据已经同步完毕,所以 ReadBinlogFileToWq
调用之前主从的增量数据肯定是已经同步了的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Status SyncMasterSlot::WakeUpSlaveBinlogSync () { std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves = GetAllSlaveNodes (); std::vector<std::shared_ptr<SlaveNode>> to_del; for (auto & slave_iter : slaves) { std::shared_ptr<SlaveNode> slave_ptr = slave_iter.second; std::lock_guard l (slave_ptr->slave_mu) ; if (slave_ptr->sent_offset == slave_ptr->acked_offset) { Status s = ReadBinlogFileToWq (slave_ptr); if (!s.ok ()) { to_del.push_back (slave_ptr); LOG (WARNING) << "WakeUpSlaveBinlogSync falied, Delete from RM, slave: " << slave_ptr->ToStringStatus () << " " << s.ToString (); } } } for (auto & to_del_slave : to_del) { RemoveSlaveNode (to_del_slave->Ip (), to_del_slave->Port ()); } return Status::OK (); }
Binlog 的过期策略 src/pika_server.cc
在 Pika 的定时器任务中,有个 AutoPurge
会对 Binlog
做定期的清理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void PikaServer::DoTimingTask () { AutoCompactRange (); AutoPurge (); AutoDeleteExpiredDump (); ResetLastSecQuerynum (); AutoUpdateNetworkMetric (); ProcessCronTask (); UpdateCacheInfo (); PrintThreadPoolQueueStatus (); } void PikaServer::AutoPurge () { DoSameThingEverySlot (TaskType::kPurgeLog); }
src/pika_server.cc
我们会调用 PurgeStableLogs
做 Binlog
删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 Status PikaServer::DoSameThingEverySlot (const TaskType& type) { std::shared_lock rwl (dbs_rw_) ; std::shared_ptr<SyncSlaveSlot> slave_slot = nullptr ; for (const auto & db_item : dbs_) { for (const auto & slot_item : db_item.second->slots_) { switch (type) { case TaskType::kResetReplState: { slave_slot = g_pika_rm->GetSyncSlaveSlotByName (SlotInfo (db_item.second->GetDBName (), slot_item.second->GetSlotID ())); if (!slave_slot) { LOG (WARNING) << "Slave Slot: " << db_item.second->GetDBName () << ":" << slot_item.second->GetSlotID () << " Not Found" ; } slave_slot->SetReplState (ReplState::kNoConnect); break ; } case TaskType::kPurgeLog: { std::shared_ptr<SyncMasterSlot> slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_item.second->GetDBName (), slot_item.second->GetSlotID ())); if (!slot) { LOG (WARNING) << "Slot: " << db_item.second->GetDBName () << ":" << slot_item.second->GetSlotID () << " Not Found." ; break ; } slot->StableLogger ()->PurgeStableLogs (); break ; } case TaskType::kCompactAll: slot_item.second->Compact (storage::kAll); break ; default : break ; } } } return Status::OK (); }
src/pika_stable_log.cc
这里可以看到清理 DoPurgeStableLogs
是异步线程做处理,执行的时候调用 DoPurgeStableLogs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool StableLog::PurgeStableLogs (uint32_t to, bool manual) { bool expect = false ; if (!purging_.compare_exchange_strong (expect, true )) { LOG (WARNING) << "purge process already exist" ; return false ; } auto arg = new PurgeStableLogArg (); arg->to = to; arg->manual = manual; arg->logger = shared_from_this (); g_pika_server->PurgelogsTaskSchedule (&DoPurgeStableLogs, static_cast <void *>(arg)); return true ; }
src/pika_stable_log.cc
调用 PurgeFiles
1 2 3 4 5 void StableLog::DoPurgeStableLogs (void * arg) { std::unique_ptr<PurgeStableLogArg> purge_arg (static_cast <PurgeStableLogArg*>(arg)) ; purge_arg->logger->PurgeFiles (purge_arg->to, purge_arg->manual); purge_arg->logger->ClearPurge (); }
src/pika_stable_log.cc
这里我们先用一个 map
的 binlogs
去获取当前目录下所有的 Binlog
文件名,然后与配置文件中规定的最大 Binlog
数量进行比对(最大是10),然后从最老(就是 filenum
最小的)的 Binlog
文件开始删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 bool StableLog::PurgeFiles (uint32_t to, bool manual) { std::map<uint32_t , std::string> binlogs; if (!GetBinlogFiles (&binlogs)) { LOG (WARNING) << log_path_ << " Could not get binlog files!" ; return false ; } int delete_num = 0 ; struct stat file_stat; auto remain_expire_num = static_cast <int32_t >(binlogs.size () - g_pika_conf->expire_logs_nums ()); std::shared_ptr<SyncMasterSlot> master_slot = nullptr ; std::map<uint32_t , std::string>::iterator it; for (it = binlogs.begin (); it != binlogs.end (); ++it) { if ((manual && it->first <= to) || (remain_expire_num > 0 ) || (binlogs.size () - delete_num > 10 && stat (((log_path_ + it->second)).c_str (), &file_stat) == 0 && file_stat.st_mtime < time (nullptr ) - g_pika_conf->expire_logs_days () * 24 * 3600 )) { master_slot = g_pika_rm->GetSyncMasterSlotByName (SlotInfo (db_name_, slot_id_)); ... if (!master_slot->BinlogCloudPurge (it->first)) { LOG (WARNING) << log_path_ << " Could not purge " << (it->first) << ", since it is already be used" ; return false ; } if (pstd::DeleteFile (log_path_ + it->second)) { ++delete_num; --remain_expire_num; } else { LOG (WARNING) << log_path_ << " Purge log file : " << (it->second) << " failed! error: delete file failed" ; } } else { break ; } } ... return true ; }