Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 24, 2024
1 parent dd324c9 commit 227a2c4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
5 changes: 4 additions & 1 deletion src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms),
METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes)
{
// Some flags might need to be tuned on the stage of loading replicas (during
// replica_stub::initialize()), thus register their control command just in the
// constructor.
register_flags_ctrl_command();
}

Expand All @@ -522,7 +525,7 @@ std::vector<replica_stub::disk_replicas_info> replica_stub::get_all_disk_dirs()
}

std::vector<std::string> sub_dirs;
CHECK(dsn::utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false),
CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false),
"failed to get sub_directories in {}",
disk_node->full_dir);
disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)});
Expand Down
44 changes: 33 additions & 11 deletions src/replica/test/load_replicas_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,22 @@ class mock_load_replica : public replica_stub
tags.push_back(tag);
}

// Initialize fs_manager by the tag and dir of each disk.
_fs_manager.initialize(dirs, tags);

// Generate the replicas which are expected after loading.
for (const auto &[tag, reps] : replicas_by_tag) {
for (const auto &pid : reps) {
ASSERT_TRUE(_expected_loaded_replica_pids.insert(pid).second);
}
}

// Initialize fs_manager.
_fs_manager.initialize(dirs, tags);

_disk_tags_for_order.clear();
_disk_dirs_for_order.clear();
_disk_replicas_for_order.clear();
_disk_loaded_replicas_for_order.assign(replicas_by_tag.size(), 0);

// Ensure that the disks are scanned in the order returned by `get_dir_node()`.
for (const auto &dn : _fs_manager.get_dir_nodes()) {
for (const auto &pid : replicas_by_tag.at(dn->tag)) {
_fs_manager.specify_dir_for_new_replica_for_test(dn.get(), "pegasus", pid);
Expand All @@ -103,10 +105,12 @@ class mock_load_replica : public replica_stub
PRESERVE_FLAG(max_replicas_on_load_for_each_disk);
FLAGS_max_replicas_on_load_for_each_disk = max_replicas_on_load_for_each_disk;

// Check if all loaded replicas are matched.
replica_stub::replica_map_by_gpid actual_loaded_replicas;
load_replicas(actual_loaded_replicas);
ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas);

// Check if all replicas have been loaded.
std::set<gpid> actual_loaded_replica_pids;
for (const auto &[pid, _] : actual_loaded_replicas) {
ASSERT_TRUE(actual_loaded_replica_pids.insert(pid).second);
Expand Down Expand Up @@ -139,6 +143,8 @@ class mock_load_replica : public replica_stub
// Once the task is `allow_inline`, it would be executed in place immediately rather
// than pushed into the queue. Thus we could test the expected order in which the
// tasks are pushed into the queue.

// Find the first disk where there is still some replica that has not been loaded.
size_t finished_disks = 0;
while (_disk_loaded_replicas_for_order[_disk_index_for_order] >=
_disk_replicas_for_order[_disk_index_for_order]) {
Expand All @@ -153,7 +159,7 @@ class mock_load_replica : public replica_stub

// Only check if the processed order of the disk the replica belongs to, rather than
// the order of the replica itself, for the reason that the order of the dirs returned
// by the underlying call varies with different systems.
// by the underlying call might vary.
ASSERT_EQ(_disk_tags_for_order[_disk_index_for_order], dn->tag);
ASSERT_EQ(_disk_dirs_for_order[_disk_index_for_order], dn->full_dir);

Expand Down Expand Up @@ -189,7 +195,8 @@ class mock_load_replica : public replica_stub

std::set<gpid> _expected_loaded_replica_pids;

// Only for testing the order of the loading tasks.
// The variables with postfix `_for_order` are only for testing the order of the loading
// tasks.
size_t _disk_index_for_order{0};
std::vector<std::string> _disk_tags_for_order;
std::vector<std::string> _disk_dirs_for_order;
Expand All @@ -205,7 +212,10 @@ class mock_load_replica : public replica_stub

struct load_replicas_case
{
// Each disk tag => dir of this disk.
std::map<std::string, std::string> dirs_by_tag;

// Each disk tag => replicas (specified by <app_id, partition_id>) on this disk.
std::map<std::string, std::vector<gpid>> replicas_by_tag;
};

Expand All @@ -214,8 +224,10 @@ class LoadReplicasTest : public testing::TestWithParam<load_replicas_case>
public:
LoadReplicasTest()
{
// Remove all dirs of all disks to prevent each test from being disturbed.
_stub.remove_disk_dirs();

// Use test cases to initialize the replica stub.
const auto &load_case = GetParam();
_stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag);
}
Expand All @@ -240,16 +252,21 @@ TEST_P(LoadReplicasTest, LoadOrder) { test_load_replicas(true, 256); }

TEST_P(LoadReplicasTest, LoadThrottling) { test_load_replicas(false, 5); }

// Generate a test case for loading replicas. Each element in `disk_replicas` is corresponding
// to the number of replicas on a disk.
load_replicas_case generate_load_replicas_case(const std::vector<size_t> &disk_replicas)
{
std::map<std::string, std::string> dirs_by_tag;
for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) {
dirs_by_tag.emplace(fmt::format("data{}", disk_index), fmt::format("disk{}", disk_index));
}

static const int32_t kNumPartitions = 8;
int32_t partition_id = 0;
static const int32_t kNumBitsPartitions = 3;
static const int32_t kNumPartitions = 1 << kNumBitsPartitions;

int32_t app_id = 1;
int32_t partition_id = 0;

std::map<std::string, std::vector<gpid>> replicas_by_tag;

while (true) {
Expand All @@ -258,18 +275,23 @@ load_replicas_case generate_load_replicas_case(const std::vector<size_t> &disk_r
for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) {
auto &replica_list = replicas_by_tag[fmt::format("data{}", disk_index)];
if (replica_list.size() >= disk_replicas[disk_index]) {
// All replicas on this disk have been generated, just skip to next disk.
++finished_disks;
continue;
}

// Generate a replica with current app id and partition index.
replica_list.emplace_back(app_id, partition_id);
if (++partition_id >= kNumPartitions) {
partition_id = 0;
++app_id;
}

// Once next partition index is found 0, increment app id to turn to next table.
app_id += ((partition_id + 1) & kNumPartitions) >> kNumBitsPartitions;

// Increment index to turn to next partition.
partition_id = (partition_id + 1) & (kNumPartitions - 1);
}

if (finished_disks >= disk_replicas.size()) {
// All disks have been done.
break;
}
}
Expand Down

0 comments on commit 227a2c4

Please sign in to comment.