Pika的Floyd设计方案

背景

新版本下的 Pika 采用 floyd 作为新的存储引擎,接下来我们以源码的方式向大家展示一下新存储引擎做了哪些改造.

KV

blackwidow

blackwidow 下的 string_db_是一个 std::unique_ptr<RedisStrings> ,RedisStrings 继承自 Redis 类,Redis 类中有一个 rocksdb::DB* db_ 的成员变量,子类 RedisStrings 使用父类 Redis 类的这个成员变量 db_用来对 RocksDB 做操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void SetCmd::Do(std::shared_ptr<Slot> slot) {  /* 0 */
rocksdb::Status s;
int32_t res = 1;
switch (condition_) {
...
default:
s = slot->db()->Set(key_, value_); // 执行set命令
break;
}
}

Status Storage::Set(const Slice& key, const Slice& value) { /* 1 */
return strings_db_->Set(key, value);
}

std::unique_ptr<RedisStrings> strings_db_; // strings_db_是一个KV类型的RocksDB实例

Status RedisStrings::Set(const Slice& key, const Slice& value) { /* 2 */
StringsValue strings_value(value);
ScopeRecordLock l(lock_mgr_, key);
return db_->Put(default_write_options_, key, strings_value.Encode()); // 调用RocksDB接口
}

floyd

floyd 下的 Pika 首先会对 key 做一层 Hash 计算 (slot 计算 + index 计算),得出一个 index,用指定索引下的 insts_去处理,insts_是一个 std::vector<std::shared_ptr<Instance>> ,Instance 里面封装了成员变量rocksdb::DB* db_, 这样的计算方式可以使多个 RocksDB 实例平均分给所有的 key,不局限于一种数据类型使用一个 RocksDB 实例,同时对 key 和 value 的构造也做了调整,对 key 来说新加了 reserve1(8B),db_id(2B),slot_id(2B),reserve2(16B). 对 value 来说新增了 reserve(16B),cdate(8B). 同时 timestampversion 字段的大小从原来的 4B 调整为 8B. 这里的 reserve字段目前用于占位,cdate字段用于记录写入时间,db_id字段用来区别各个 DB 的数据,slot_id字段用于记录 key Hash 得到的 slot 值.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void SetCmd::Do(std::shared_ptr<Slot> slot) {  /* 0 */
rocksdb::Status s;
int32_t res = 1;
switch (condition_) {
...
default:
s = slot->db()->Set(key_, value_); // 执行set命令
break;
}
}

Status Storage::Set(const Slice& key, const Slice& value) { /* 1 */
auto inst = GetDBInstance(key); // Hash计算得出一个指定(Index)下标的RocksDB实例处理
return inst->Set(key, value);
}

std::shared_ptr<Instance> Storage::GetDBInstance(const Slice& key) { /* 2 */
return GetDBInstance(key.ToString());
}

std::shared_ptr<Instance> Storage::GetDBInstance(const std::string& key) { /* 3 */
auto inst_index = slot_indexer_->GetInstanceID(GetSlotID(key)); // 计算出Index
LOG(WARNING) << "key: " << key << " slot_id: " << GetSlotID(key) << " inst_index: " << inst_index;
return insts_[inst_index]; // 返回指定(Index)下标的RocksDB实例
}

uint32_t GetInstanceID(int32_t slot_id) {return slot_id % inst_num_; } /* 4 */

std::unique_ptr<SlotIndexer> slot_indexer_;
std::vector<std::shared_ptr<Instance>> insts_;

Status Instance::Set(const Slice& key, const Slice& value) { /* 5 */
StringsValue strings_value(value);
ScopeRecordLock l(lock_mgr_, key);
uint16_t slot_id = static_cast<uint16_t>(GetSlotID(key.ToString()));
BaseMetaKey base_key(0/*db_id*/, slot_id, key);
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
}
1
2
3
4
5
6
7
/* 旧版 key 格式 */                                         /* 旧版 value 格式 */                      
| key | | value | timestamp |
| | | | 4B |

/* 新版 key 格式 /* 新版 value 格式 */
| reserve1 | db_id | slot_id | key | reserve2 | | value | reserve | cdate | timestamp |
| 8B | 2B | 2B | | 16B | | | 16B | 8B | 8B |

Hash

blackwidow

对于 Hash 类型,我们以 Hset 命令了解,对了除 KV 之外的数据类型,我们需要操作 RocksDB 的列族

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void HSetCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
int32_t ret = 0;
rocksdb::Status s = slot->db()->HSet(key_, field_, value_, &ret);
...
}

Status Storage::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) { /* 1 */
return hashes_db_->HSet(key, field, value, res);
}

Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
...
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value); // handles_[0]用于存hash的meta信息
char meta_value_buf[4] = {0};
if (s.ok()) {
...
batch.Put(handles_[0], key, meta_value);
HashesDataKey data_key(key, version, field);
batch.Put(handles_[1], data_key.Encode(), value);
...
} else {
...
s = db_->Get(default_read_options_, handles_[1], hashes_data_key.Encode(), &data_value);// handles_[1]用于存hash的data信息
if (s.ok()) {
...
batch.Put(handles_[1], hashes_data_key.Encode(), value);
...
} else if (s.IsNotFound()) {
...
parsed_hashes_meta_value.ModifyCount(1);
batch.Put(handles_[0], key, meta_value);
batch.Put(handles_[1], hashes_data_key.Encode(), value);
...
}
} else if (s.IsNotFound()) {
...
batch.Put(handles_[0], key, meta_value.Encode());
HashesDataKey data_key(key, version, field);
batch.Put(handles_[1], data_key.Encode(), value);
...
}
s = db_->Write(default_write_options_, &batch);
UpdateSpecificKeyStatistics(key.ToString(), statistic);
return s;
}

floyd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void HSetCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
int32_t ret = 0;
rocksdb::Status s = slot->db()->HSet(key_, field_, value_, &ret);
...
}

Status Storage::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) { /* 1 */
auto inst = GetDBInstance(key);
return inst->HSet(key, field, value, res);
}

Status Instance::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) { /* 2 */
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
uint32_t statistic = 0;
std::string meta_value;
uint16_t slot_id = static_cast<uint16_t>(GetSlotID(key.ToString()));
BaseMetaKey base_meta_key(0/*db_id*/, slot_id, key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value); //用于存hash的meta信息
char meta_value_buf[4] = {0};
if (s.ok()) {
...
batch.Put(handles_[kHashesDataCF], data_key.Encode(), internal_value.Encode());用于存hash的Data信息
} else {
...
s = db_->Get(default_read_options_, handles_[kHashesDataCF], hashes_data_key.Encode(), &data_value);
if (s.ok()) {
...
batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode());
...
} else if (s.IsNotFound()) {
...
batch.Put(handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value);
batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode());
...
}
} else if (s.IsNotFound()) {
...
batch.Put(handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value.Encode());
...
batch.Put(handles_[kHashesDataCF], data_key.Encode(), internal_value.Encode());
...
}
s = db_->Write(default_write_options_, &batch);
UpdateSpecificKeyStatistics(DataType::kHashes, key.ToString(), statistic);
return s;
}

enum ColumnFamilyIndex { // 这里指定了各个数据类型在同一个RocksDB实例下的列族索引
kStringsCF = 0,
kHashesMetaCF = 1,
kHashesDataCF = 2,
kSetsMetaCF = 3,
kSetsDataCF = 4,
kListsMetaCF = 5,
kListsDataCF = 6,
kZsetsMetaCF = 7,
kZsetsDataCF = 8,
kZsetsScoreCF = 9,
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* 旧版 meta_key 格式 */                                    /* 旧版 meta_value 格式 */                      
| key | | hash_size | version | timestamp |
| | | 4B | 4B | 4B |

/* 新版 meta_key 格式 */ /* 新版 meta_value 格式 */
| reserve1 | db_id | slot_id | key | reserve2 | | hash_size | version | reserve | cdate | timestamp |
| 8B | 2B | 2B | | 16B | | 4B | 8B | 16B | 8B | 8B |
--------------------------------------------------------------------------------------------------------------------------------
/* 旧版 data_key 格式 */ /* 旧版 data_value 格式 */
| key_size | key | version | field | | hash_value |
| 4B | | 4B | | | |

/* 新版 data_key 格式 */ /* 新版 data_value 格式 */
| reserve1 | db_id | slot_id | key_size | key | version | field | reserve2 | | hash_value | reserved | cdate |
| 8B | 2B | 2B | 4B | | 8B | | 16B | | | 16B | 8B |

List

blackwidow

对于 Lish 类型,我们以 Lpush 命令了解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void LPushCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
uint64_t llen = 0;
rocksdb::Status s = slot->db()->LPush(key_, values_, &llen);
...
}

Status Storage::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) { /* 1 */
return lists_db_->LPush(key, values, ret);
}

Status RedisLists::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) { /* 2 */
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
...
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value); // 存放list的meta信息
if (s.ok()) {
for (const auto& value : values) {
index = parsed_lists_meta_value.left_index();
parsed_lists_meta_value.ModifyLeftIndex(1);
parsed_lists_meta_value.ModifyCount(1);
ListsDataKey lists_data_key(key, version, index);
batch.Put(handles_[1], lists_data_key.Encode(), value); // 存放list的data信息
}
batch.Put(handles_[0], key, meta_value);
...
} else if (s.IsNotFound()) {
for (const auto& value : values) {
index = lists_meta_value.left_index();
lists_meta_value.ModifyLeftIndex(1);
ListsDataKey lists_data_key(key, version, index);
batch.Put(handles_[1], lists_data_key.Encode(), value);
}
batch.Put(handles_[0], key, lists_meta_value.Encode());
}
....
return db_->Write(default_write_options_, &batch);
}

floyd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void LPushCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
uint64_t llen = 0;
rocksdb::Status s = slot->db()->LPush(key_, values_, &llen);
...
}

Status Storage::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) { /* 1 */
auto inst = GetDBInstance(key);
return inst->LPush(key, values, ret);
}

Status Instance::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) { /* 2 */
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
...
uint16_t slot_id = static_cast<uint16_t>(GetSlotID(key.ToString()));
BaseMetaKey base_meta_key(0/*db_id*/, slot_id, key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value); // 存放list的meta信息
if (s.ok()) {
...
for (const auto& value : values) {
index = parsed_lists_meta_value.left_index();
parsed_lists_meta_value.ModifyLeftIndex(1);
parsed_lists_meta_value.ModifyCount(1);
ListsDataKey lists_data_key(0/*db_id*/, slot_id, key, version, index);
InternalValue i_val(value);
batch.Put(handles_[kListsDataCF], lists_data_key.Encode(), i_val.Encode());// 存放list的data信息
}
batch.Put(handles_[kListsMetaCF], base_meta_key.Encode(), meta_value);
*ret = parsed_lists_meta_value.count();
} else if (s.IsNotFound()) {
...
for (const auto& value : values) {
index = lists_meta_value.left_index();
lists_meta_value.ModifyLeftIndex(1);
ListsDataKey lists_data_key(0/*db_id*/, slot_id, key, version, index);
InternalValue i_val(value);
batch.Put(handles_[kListsDataCF], lists_data_key.Encode(), i_val.Encode());
}
batch.Put(handles_[kListsMetaCF], base_meta_key.Encode(), lists_meta_value.Encode());
...
}
return db_->Write(default_write_options_, &batch);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* 旧版 meta_key 格式 */                      
| key |
| |

/* 新版 meta_key 格式 */
| reserve1 | db_id | slot_id | key | reserve2 |
| 8B | 2B | 2B | | 16B |

/* 旧版 meta_value 格式 */
| list_size | version | timestamp | left_index | right_index |
| 8B | 4B | 4B | 8B | 8B |

/* 新版 meta_value 格式 */
| list_size | version | left_index | right_index | reserve | cdate | timestamp |
| 8B | 8B | 8B | 8B | 16B | 8B | 8B |
--------------------------------------------------------------------------------------------------------------------------------
/* 旧版 data_key 格式 */ /* 旧版 data_value 格式 */
| key_size | key | version | index | | list_value |
| 4B | | 4B | 8B | | |

/* 新版 data_key 格式 */ /* 新版 data_value 格式 */
| reserve1 | db_id | slot_id | key_size | key | version | index | reserve2 | | list_value | reserve | cdate |
| 8B | 2B | 2B | 4B | | 8B | 8B | 16B | | | 16B | 8B |

Set

blackwidow

对于 Set 类型,我们以 SAdd 命令了解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void SAddCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
int32_t count = 0;
rocksdb::Status s = slot->db()->SAdd(key_, members_, &count);
...
}

Status Storage::SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret) { /* 1 */
return sets_db_->SAdd(key, members, ret);
}

rocksdb::Status RedisSets::SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret) { /* 2 */
...
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
...
rocksdb::Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
...
batch.Put(handles_[0], key, meta_value);
for (const auto& member : filtered_members) {
SetsMemberKey sets_member_key(key, version, member);
batch.Put(handles_[1], sets_member_key.Encode(), Slice());
}
...
} else {
...
for (const auto& member : filtered_members) {
...
s = db_->Get(default_read_options_, handles_[1], sets_member_key.Encode(), &member_value);
if (s.ok()) {
} else if (s.IsNotFound()) {
cnt++;
batch.Put(handles_[1], sets_member_key.Encode(), Slice());
} else {
return s;
}
}
...
batch.Put(handles_[0], key, meta_value);
...
}
} else if (s.IsNotFound()) {
...
batch.Put(handles_[0], key, sets_meta_value.Encode());
for (const auto& member : filtered_members) {
SetsMemberKey sets_member_key(key, version, member);
batch.Put(handles_[1], sets_member_key.Encode(), Slice());
}
...
}
return db_->Write(default_write_options_, &batch);
}

floyd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void SAddCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
int32_t count = 0;
rocksdb::Status s = slot->db()->SAdd(key_, members_, &count);
if (!s.ok()) {
res_.SetRes(CmdRes::kErrOther, s.ToString());
return;
}
AddSlotKey("s", key_, slot);
res_.AppendInteger(count);
}

Status Storage::SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret) { /* 1 */
auto inst = GetDBInstance(key);
return inst->SAdd(key, members, ret);
}

rocksdb::Status Instance::SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret) { /* 2 */
...
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
...
uint16_t slot_id = static_cast<uint16_t>(GetSlotID(key.ToString()));
BaseMetaKey base_meta_key(0/*db_id*/, slot_id, key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
...
for (const auto& member : filtered_members) {
SetsMemberKey sets_member_key(0/*db_id*/, slot_id, key, version, member);
InternalValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
}
...
} else {
...
for (const auto& member : filtered_members) {
SetsMemberKey sets_member_key(0/*db_id*/, slot_id, key, version, member);
s = db_->Get(default_read_options_, handles_[kSetsDataCF], sets_member_key.Encode(), &member_value);
...
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
...
}
...
batch.Put(handles_[kSetsMetaCF], base_meta_key.Encode(), meta_value);
...
}
} else if (s.IsNotFound()) {
...
batch.Put(handles_[kSetsMetaCF], base_meta_key.Encode(), sets_meta_value.Encode());
for (const auto& member : filtered_members) {
...
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode());
}
}
...
return db_->Write(default_write_options_, &batch);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* 旧版 meta_key 格式 */                                    /* 旧版 meta_value 格式 */                      
| key | | set_size | version | timestamp |
| | | 4B | 4B | 4B |

/* 新版 meta_key 格式 */ /* 新版 meta_value 格式 */
| reserve1 | db_id | slot_id | key | reserved2 | | set_size | version | reserve | cdate | timestamp |
| 8B | 2B | 2B | | 16B | | 4B | 8B | 16B | 8B | 8B |
--------------------------------------------------------------------------------------------------------------------------------
/* 旧版 data_key 格式 */ /* 旧版 data_value 格式 */
| key_size | key | version | member | | |
| 4B | | 4B | | | |

/* 新版 data_key 格式 */ /* 新版 data_value 格式 */
| reserve1 | db_id | slot_id | key_size | key | version | member | reserve2 | | reserved | cdate |
| 8B | 2B | 2B | 4B | | 8B | | 16B | | 16B | 8B |

Zset

blackwidow

对于 Zset 类型,我们以 ZAdd 命令了解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void ZAddCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
int32_t count = 0;
rocksdb::Status s = slot->db()->ZAdd(key_, score_members, &count);
...
}

Status Storage::ZAdd(const Slice& key, const std::vector<ScoreMember>& score_members, int32_t* ret) { /* 1 */
return zsets_db_->ZAdd(key, score_members, ret);
}

Status RedisZSets::ZAdd(const Slice& key, const std::vector<ScoreMember>& score_members, int32_t* ret) { /* 2 */
...
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
...
for (const auto& sm : filtered_score_members) {
...
if (vaild) {
s = db_->Get(default_read_options_, handles_[1], zsets_member_key.Encode(), &data_value);
if (s.ok()) {
...
} else {
ZSetsScoreKey zsets_score_key(key, version, old_score, sm.member);
batch.Delete(handles_[2], zsets_score_key.Encode());
// delete old zsets_score_key and overwirte zsets_member_key
// but in different column_families so we accumulative 1
statistic++;
}
...
}

const void* ptr_score = reinterpret_cast<const void*>(&sm.score);
EncodeFixed64(score_buf, *reinterpret_cast<const uint64_t*>(ptr_score));
batch.Put(handles_[1], zsets_member_key.Encode(), Slice(score_buf, sizeof(uint64_t)));

ZSetsScoreKey zsets_score_key(key, version, sm.score, sm.member);
batch.Put(handles_[2], zsets_score_key.Encode(), Slice());
...
}
...
batch.Put(handles_[0], key, meta_value);
*ret = cnt;
} else if (s.IsNotFound()) {
...
batch.Put(handles_[0], key, zsets_meta_value.Encode());
for (const auto& sm : filtered_score_members) {
...
batch.Put(handles_[1], zsets_member_key.Encode(), Slice(score_buf, sizeof(uint64_t)));

ZSetsScoreKey zsets_score_key(key, version, sm.score, sm.member);
batch.Put(handles_[2], zsets_score_key.Encode(), Slice());
}
...
s = db_->Write(default_write_options_, &batch);
UpdateSpecificKeyStatistics(key.ToString(), statistic);
return s;
}

floyd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void ZAddCmd::Do(std::shared_ptr<Slot> slot) { /* 0 */
int32_t count = 0;
rocksdb::Status s = slot->db()->ZAdd(key_, score_members, &count);
...
}

Status Storage::ZAdd(const Slice& key, const std::vector<ScoreMember>& score_members, int32_t* ret) { /* 1 */
auto inst = GetDBInstance(key);
return inst->ZAdd(key, score_members, ret);
}

Status Instance::ZAdd(const Slice& key, const std::vector<ScoreMember>& score_members, int32_t* ret) { /* 2 */
...
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
...
uint16_t slot_id = static_cast<uint16_t>(GetSlotID(key.ToString()));
BaseMetaKey base_meta_key(0/*db_id*/, slot_id, key);
Status s = db_->Get(default_read_options_, handles_[kZsetsMetaCF], base_meta_key.Encode(), &meta_value);
LOG(WARNING) << "ZAdd get meta status: " << s.ToString() << " key: " << key.ToString();
if (s.ok()) {
...
for (const auto& sm : filtered_score_members) {
bool not_found = true;
ZSetsMemberKey zsets_member_key(0/*db_id*/, slot_id, key, version, sm.member);
if (vaild) {
s = db_->Get(default_read_options_, handles_[kZsetsDataCF], zsets_member_key.Encode(), &data_value);
...
} else {
ZSetsScoreKey zsets_score_key(0/*db_id*/, slot_id, key, version, old_score, sm.member);
batch.Delete(handles_[kZsetsScoreCF], zsets_score_key.Encode());
// delete old zsets_score_key and overwirte zsets_member_key
// but in different column_families so we accumulative 1
statistic++;
}
...
}

...
batch.Put(handles_[kZsetsDataCF], zsets_member_key.Encode(), zsets_member_i_val.Encode());

ZSetsScoreKey zsets_score_key(0/*db_id*/, slot_id, key, version, sm.score, sm.member);
InternalValue zsets_score_i_val(Slice{});
batch.Put(handles_[kZsetsScoreCF], zsets_score_key.Encode(), zsets_score_i_val.Encode());
...
} else if (s.IsNotFound()) {
...
batch.Put(handles_[kZsetsMetaCF], base_meta_key.Encode(), zsets_meta_value.Encode());
for (const auto& sm : filtered_score_members) {
...
batch.Put(handles_[kZsetsDataCF], zsets_member_key.Encode(), zsets_member_i_val.Encode());

ZSetsScoreKey zsets_score_key(0/*db_id*/, slot_id, key, version, sm.score, sm.member);
InternalValue zsets_score_i_val(Slice{});
batch.Put(handles_[kZsetsScoreCF], zsets_score_key.Encode(), zsets_score_i_val.Encode());
...
s = db_->Write(default_write_options_, &batch);
UpdateSpecificKeyStatistics(DataType::kZSets, key.ToString(), statistic);
return s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* 旧版 meta_key 格式 */                                     /* 旧版 meta_value 格式 */                      
| key | | zset_size | version | timestamp |
| | | 4B | 4B | 4B |

/* 新版 meta_key 格式 */ /* 新版 meta_value 格式 */
| reserve1 | db_id | slot_id | key | reserved2 | | zset_size | version | reserve | cdate | timestamp |
| 8B | 2B | 2B | | 16B | | 4B | 8B | 16B | 8B | 8B |
--------------------------------------------------------------------------------------------------------------------------------
/* 旧版 member_to_score_date_key 格式 */ /* 旧版 member_to_score_data_value 格式 */
| key_size | key | version | member | | score_value |
| 4B | | 4B | | | 8B |

/* 新版 member_to_score_data_key 格式 */ /* 新版 member_to_score_data_value 格式 */
| reserve1 | db_id | slot_id | key_size | key | version | member | reserve2 | | score_value | reserve | cdate |
| 8B | 2B | 2B | 4B | | 8B | | 16B | | 8B | 16B | 8B |
---------------------------------------------------------------------------------------------------------------------------------
/* 旧版 score_to_member_date_key 格式 */
| key_size | key | version | score | member |
| 4B | | 4B | 8B | |

/* 新版 score_to_member_data_key 格式 */
| reserve1 | db_id | slot_id | key_size | key | version | score | member | reserve2 |
| 8B | 2B | 2B | 4B | | 8B | 8B | | 16B |


/* 旧版 score_to_member_date_value 格式 */
| |
| |

/* 新版 score_to_member_date_value 格式 */
| reserve | cdate |
| 16B | 8B |

FAQ

Q1: 为什么要加 reserve 字段,没看懂?

A1: reserve 翻译过来就是保留预定的意思,目前该字段我们先用于占位,后续有需求我们再加上去

Q2:为什么要加 slot-id,db-id,cdate 字段,为什么之前的存储引擎方式不需要这三个字段?

A2:

Q3:version 和 timestamp 为什么要由 4B 改成 8B?

A3:

Q4:你这篇文章说的都是单个 key 的两者的区别,那么对于多个 key 来说?多个 key 可能会保存在多个 RocksDB 实例中,那你解决并发问题?

A4:看来你考虑挺仔细的,我会在下篇文章介绍~

Q5:只需要在 Pika 层面做存储引擎的修改,对于集群模式来说,Codis 层面也需要做修改吗?

A5:是个好问题,目前我也在考虑

Q6:我刚刚发现 blackwidow 下的 RocksDB 实例用的是 unique_ptr,而 floyd 下的 RocksDB 实例用的是 shard_ptr,这是为什么?

A6:

Q7:为什么要改存储引擎,floyd 有什么优势呢?

A7:线上使用过程中发现,同一个业务服务使用的数据类型一般集中在一两个数据类型中,无法发挥多RocksDB实例的优势,floyd 的实现方式能保证每个分片持有的 RocksDB 实例个数近似相同

总结

floyd 中不再按照数据类型区分RocksDB实例,而是通过 column-family 区分.单个 Pika 节点的 RocksDB 实例个数根据物理机硬件配置决定,每个 RocksDB 实例使用独立的 compaction 线程池和 flush 线程池,初次之外每个 RocksDB 实例使用一个后台线程,该后台线程用来发起 manual compaction 以及对 RocksDB 中存储的数据进行定期的统计和巡检. 每个节点在启动时获取到当前节点持有的分片,将分片排序并等分为 RocksDB 实例个数,保证每个分片持有的 RocksDB 实例个数近似相同. 对于 key 来讲,前缀增加 8 字节的 reserve 保留字段以及 2 字节的 slot_id 和 2 字节的 db_id,后缀增加16字节的 reserve 保留字段。对于 value 来讲,在 value 最后统一增加:16 字节的 reserve 保留字段,8 字节的数据的写入时间 cdate. 此外 timestamp 和 version 由之前的 4 字节调整为 8 字节. 最后关于新的存储引擎还有无效数据清理和RocksDB使用的优化,我会在后续的文章中更新~