Floyd无效数据清理方案

背景

在 Pika 的新存储引擎 Floyd 下每个 RocksDB 实例使用独立的 minor compaction 线程池和 major compaction 线程池,除此之外每个 RocksDB 实例使用一个后台线程,该后台线程用来发起 manual compaction 以及对 RocksDB 中存储的数据进行定期的统计和巡检

讨论

之前 Pika 的 Compaction 机制是 autocompaction 和 手动执行 compaction两种,其中 autocompaction分为在指定时间段和磁盘达到某些条件下进行 compaction,其中 DoSameThingSpecificDBDoSameThingEverySlot两个接口处理。

1
2
3
4
5
6
7
8
9
10
11
 if ((static_cast<double>(free_size) / static_cast<double>(total_size)) * 100 >= usage) { // 达到条件
...
Status s = DoSameThingSpecificDB(dbs, {TaskType::kCompactAll});
}

if (!have_scheduled_crontask_ && in_window) { // 定时
if ((static_cast<double>(free_size) / static_cast<double>(total_size)) * 100 >= usage) {
Status s = DoSameThingEverySlot(TaskType::kCompactAll);
...
}
}

对于手动 compaction,我们是以执行命令的形式,可以看到 compaction 的过程是同步的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void CompactCmd::Do(std::shared_ptr<Slot> slot) {
if (strcasecmp(struct_type_.data(), "all") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactAll});
} else if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactStrings});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactHashes});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactSets});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactZSets});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactList});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
LogCommand();
res_.SetRes(CmdRes::kOk);
}

Status Storage::DoCompact(const DataType& type) {
if (type != kAll && type != kStrings && type != kHashes && type != kSets && type != kZSets && type != kLists) {
return Status::InvalidArgument("");
}

Status s;
if (type == kStrings) {
current_task_type_ = Operation::kCleanStrings;
s = strings_db_->CompactRange(nullptr, nullptr);
} else if (type == kHashes) {
current_task_type_ = Operation::kCleanHashes;
s = hashes_db_->CompactRange(nullptr, nullptr);
} else if (type == kSets) {
current_task_type_ = Operation::kCleanSets;
s = sets_db_->CompactRange(nullptr, nullptr);
} else if (type == kZSets) {
current_task_type_ = Operation::kCleanZSets;
s = zsets_db_->CompactRange(nullptr, nullptr);
} else if (type == kLists) {
current_task_type_ = Operation::kCleanLists;
s = lists_db_->CompactRange(nullptr, nullptr);
} else {
current_task_type_ = Operation::kCleanAll;
s = strings_db_->CompactRange(nullptr, nullptr); // 对string类型的数据进行compaction
s = hashes_db_->CompactRange(nullptr, nullptr);
s = sets_db_->CompactRange(nullptr, nullptr);
s = zsets_db_->CompactRange(nullptr, nullptr);
s = lists_db_->CompactRange(nullptr, nullptr);
}
current_task_type_ = Operation::kNone;
return s;
}

方案

blackwidow

在 blackwidow 下启动某个 RocksDB 时,用的是默认的 env

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Status RedisHashes::Open(const StorageOptions& storage_options, const std::string& db_path) {
statistics_store_->SetCapacity(storage_options.statistics_max_size);
small_compaction_threshold_ = storage_options.small_compaction_threshold;

rocksdb::Options ops(storage_options.options);
Status s = rocksdb::DB::Open(ops, db_path, &db_);
if (s.ok()) {
// create column family
rocksdb::ColumnFamilyHandle* cf;
s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "data_cf", &cf);
if (!s.ok()) {
return s;
}
// close DB
delete cf;
delete db_;
}

// Open
rocksdb::DBOptions db_ops(storage_options.options); // 这里用的是默认的 dp_ops.env
rocksdb::ColumnFamilyOptions meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions data_cf_ops(storage_options.options);
meta_cf_ops.compaction_filter_factory = std::make_shared<HashesMetaFilterFactory>();
data_cf_ops.compaction_filter_factory = std::make_shared<HashesDataFilterFactory>(&db_, &handles_);

// use the bloom filter policy to reduce disk reads
rocksdb::BlockBasedTableOptions table_ops(storage_options.table_options);
table_ops.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, true));
rocksdb::BlockBasedTableOptions meta_cf_table_ops(table_ops);
rocksdb::BlockBasedTableOptions data_cf_table_ops(table_ops);
if (!storage_options.share_block_cache && storage_options.block_cache_size > 0) {
meta_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
data_cf_table_ops.block_cache = rocksdb::NewLRUCache(storage_options.block_cache_size);
}
meta_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(meta_cf_table_ops));
data_cf_ops.table_factory.reset(rocksdb::NewBlockBasedTableFactory(data_cf_table_ops));

std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
// Meta CF
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, meta_cf_ops);
// Data CF
column_families.emplace_back("data_cf", data_cf_ops);
return rocksdb::DB::Open(db_ops, db_path, column_families, &handles_, &db_);
}

我们看下 RocksDB 下默认的 env 定义,这里声明了一个静态变量 thread_joiner,在 PosixEnv() 的构造函数中,看到有两个线程池,一个用来做 minor compaction ,一个用来做 major compaction,由于是静态变量,所以 blackwidow 下五种数据类型用的是同一个 Env ,意味着五种数据类型的 compaction全都是通过这两个线程池去处理的。为了改进,我们决定修改 RocksDB 源码去实现每个 RocksDB 实例单独分配线程池去处理 compaction.

rocksdb/env/env_posix.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Env* Env::Default() { // 默认的Env
// The following function call initializes the singletons of ThreadLocalPtr
// right before the static default_env. This guarantees default_env will
// always being destructed before the ThreadLocalPtr singletons get
// destructed as C++ guarantees that the destructions of static variables
// is in the reverse order of their constructions.
//
// Since static members are destructed in the reverse order
// of their construction, having this call here guarantees that
// the destructor of static PosixEnv will go first, then the
// the singletons of ThreadLocalPtr.
ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS();
// Avoid problems with accessing most members of Env::Default() during
// static destruction.
STATIC_AVOID_DESTRUCTION(PosixEnv, default_env);
// This destructor must be called on exit
static PosixEnv::JoinThreadsOnExit thread_joiner(default_env);
return &default_env;
}

PosixEnv::PosixEnv()
: CompositeEnv(FileSystem::Default(), SystemClock::Default()),
thread_pools_storage_(Priority::TOTAL),
allow_non_owner_access_storage_(true),
thread_pools_(thread_pools_storage_),
mu_(mu_storage_),
threads_to_join_(threads_to_join_storage_),
allow_non_owner_access_(allow_non_owner_access_storage_) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority( // 线程池
static_cast<Env::Priority>(pool_id));
// This allows later initializing the thread-local-env of each thread.
thread_pools_[pool_id].SetHostEnv(this);
}
thread_status_updater_ = CreateThreadStatusUpdater();
}

floyd

pika/src/storage/src/instance.cc下,我们需要修改 dp_ops.env这个变量.

1
2
3
4
5
6
7
Status Instance::Open(const StorageOptions& storage_options, const std::string& db_path) {
...
rocksdb::DBOptions db_ops(storage_options.options);
db_ops.create_missing_column_families = true;
db_ops.env = rocksdb::Env::Instance(); // 当前的是自定义的Env
...
}

RocksDB

在 RocksDB 层面,我们做了以下的修改,在 env.h中加上了一个静态方案 Instance用来返回 PosixEnv对象,这样每个 RocksDB 都有自己对应的线程池去做单独的 compaction

env/env_posix.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+ PosixEnv();
+ ~PosixEnv() override {
+ for (const auto tid : threads_to_join_) {
+ pthread_join(tid, nullptr);
+ }
+ for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
+ thread_pools_[pool_id].JoinAllThreads();
+ }
+ }


+ Env* Env::Instance() {
+ return (new PosixEnv());
+ }

include/rocksdb/env.h

1
2
+ // get one env instance for convenience
+ static Env* Instance(); // 返回一个PosixEnv()