From 227a2c4738bda20c12c9c4fb817298711dab34a9 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Tue, 24 Dec 2024 16:37:28 +0800 Subject: [PATCH] refactor --- src/replica/replica_stub.cpp | 5 ++- src/replica/test/load_replicas_test.cpp | 44 ++++++++++++++++++------- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 639f0ab4a8..8ecebd8e4a 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -499,6 +499,9 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms), METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes) { + // Some flags might need to be tuned on the stage of loading replicas (during + // replica_stub::initialize()), thus register their control command just in the + // constructor. register_flags_ctrl_command(); } @@ -522,7 +525,7 @@ std::vector replica_stub::get_all_disk_dirs() } std::vector sub_dirs; - CHECK(dsn::utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false), + CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false), "failed to get sub_directories in {}", disk_node->full_dir); disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)}); diff --git a/src/replica/test/load_replicas_test.cpp b/src/replica/test/load_replicas_test.cpp index 2e6c08808a..8de9c661f9 100644 --- a/src/replica/test/load_replicas_test.cpp +++ b/src/replica/test/load_replicas_test.cpp @@ -68,6 +68,9 @@ class mock_load_replica : public replica_stub tags.push_back(tag); } + // Initialize fs_manager by the tag and dir of each disk. + _fs_manager.initialize(dirs, tags); + // Generate the replicas which are expected after loading. for (const auto &[tag, reps] : replicas_by_tag) { for (const auto &pid : reps) { @@ -75,13 +78,12 @@ class mock_load_replica : public replica_stub } } - // Initialize fs_manager. - _fs_manager.initialize(dirs, tags); - _disk_tags_for_order.clear(); _disk_dirs_for_order.clear(); _disk_replicas_for_order.clear(); _disk_loaded_replicas_for_order.assign(replicas_by_tag.size(), 0); + + // Ensure that the disks are scanned in the order returned by `get_dir_node()`. for (const auto &dn : _fs_manager.get_dir_nodes()) { for (const auto &pid : replicas_by_tag.at(dn->tag)) { _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), "pegasus", pid); @@ -103,10 +105,12 @@ class mock_load_replica : public replica_stub PRESERVE_FLAG(max_replicas_on_load_for_each_disk); FLAGS_max_replicas_on_load_for_each_disk = max_replicas_on_load_for_each_disk; + // Check if all loaded replicas are matched. replica_stub::replica_map_by_gpid actual_loaded_replicas; load_replicas(actual_loaded_replicas); ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas); + // Check if all replicas have been loaded. std::set actual_loaded_replica_pids; for (const auto &[pid, _] : actual_loaded_replicas) { ASSERT_TRUE(actual_loaded_replica_pids.insert(pid).second); @@ -139,6 +143,8 @@ class mock_load_replica : public replica_stub // Once the task is `allow_inline`, it would be executed in place immediately rather // than pushed into the queue. Thus we could test the expected order in which the // tasks are pushed into the queue. + + // Find the first disk where there is still some replica that has not been loaded. size_t finished_disks = 0; while (_disk_loaded_replicas_for_order[_disk_index_for_order] >= _disk_replicas_for_order[_disk_index_for_order]) { @@ -153,7 +159,7 @@ class mock_load_replica : public replica_stub // Only check if the processed order of the disk the replica belongs to, rather than // the order of the replica itself, for the reason that the order of the dirs returned - // by the underlying call varies with different systems. + // by the underlying call might vary. ASSERT_EQ(_disk_tags_for_order[_disk_index_for_order], dn->tag); ASSERT_EQ(_disk_dirs_for_order[_disk_index_for_order], dn->full_dir); @@ -189,7 +195,8 @@ class mock_load_replica : public replica_stub std::set _expected_loaded_replica_pids; - // Only for testing the order of the loading tasks. + // The variables with postfix `_for_order` are only for testing the order of the loading + // tasks. size_t _disk_index_for_order{0}; std::vector _disk_tags_for_order; std::vector _disk_dirs_for_order; @@ -205,7 +212,10 @@ class mock_load_replica : public replica_stub struct load_replicas_case { + // Each disk tag => dir of this disk. std::map dirs_by_tag; + + // Each disk tag => replicas (specified by ) on this disk. std::map> replicas_by_tag; }; @@ -214,8 +224,10 @@ class LoadReplicasTest : public testing::TestWithParam public: LoadReplicasTest() { + // Remove all dirs of all disks to prevent each test from being disturbed. _stub.remove_disk_dirs(); + // Use test cases to initialize the replica stub. const auto &load_case = GetParam(); _stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag); } @@ -240,6 +252,8 @@ TEST_P(LoadReplicasTest, LoadOrder) { test_load_replicas(true, 256); } TEST_P(LoadReplicasTest, LoadThrottling) { test_load_replicas(false, 5); } +// Generate a test case for loading replicas. Each element in `disk_replicas` is corresponding +// to the number of replicas on a disk. load_replicas_case generate_load_replicas_case(const std::vector &disk_replicas) { std::map dirs_by_tag; @@ -247,9 +261,12 @@ load_replicas_case generate_load_replicas_case(const std::vector &disk_r dirs_by_tag.emplace(fmt::format("data{}", disk_index), fmt::format("disk{}", disk_index)); } - static const int32_t kNumPartitions = 8; - int32_t partition_id = 0; + static const int32_t kNumBitsPartitions = 3; + static const int32_t kNumPartitions = 1 << kNumBitsPartitions; + int32_t app_id = 1; + int32_t partition_id = 0; + std::map> replicas_by_tag; while (true) { @@ -258,18 +275,23 @@ load_replicas_case generate_load_replicas_case(const std::vector &disk_r for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) { auto &replica_list = replicas_by_tag[fmt::format("data{}", disk_index)]; if (replica_list.size() >= disk_replicas[disk_index]) { + // All replicas on this disk have been generated, just skip to next disk. ++finished_disks; continue; } + // Generate a replica with current app id and partition index. replica_list.emplace_back(app_id, partition_id); - if (++partition_id >= kNumPartitions) { - partition_id = 0; - ++app_id; - } + + // Once next partition index is found 0, increment app id to turn to next table. + app_id += ((partition_id + 1) & kNumPartitions) >> kNumBitsPartitions; + + // Increment index to turn to next partition. + partition_id = (partition_id + 1) & (kNumPartitions - 1); } if (finished_disks >= disk_replicas.size()) { + // All disks have been done. break; } }