From d711b08d1a15f7823fcea0c82bdbc6d45e4bf3ce Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 26 Dec 2024 14:35:30 +0800 Subject: [PATCH] perf: enhance the loading process of replicas particularly when a significant number of replicas are spread across multiple disks (#2078) Immediately after the replica server is started, all of the replicas under the data directory would be loaded. Currently, a loading task is launched for each replica directory. The tasks for loading replica directories are pushed into a partitioned thread pool (namely `THREAD_POOL_REPLICATION`) disk by disk: only after all of the replica directories on the current disk have been pushed into the thread pool would the process move on to the next disk. Since the thread pool is partitioned while the hash for each task is the total number of the tasks that have been in the pool before this task is added, all of the replica directories on one disk would be executed concurrently. This would lead to two problems once there are a great number of replica directories on each disk: - I/O usage for each disk might become saturated: its `%util` might become 100%; - The entire loading process is blocked on each single disk: during a long period only one disk is keeping busy while others are idle. The replica server seems getting stuck in loading replicas after it is started. This is unacceptable and should be changed. The improved version allows the replica directories on different disks to be loaded simultaneously: every disk would be busy loading replicas. Also, loading tasks would be pushed into a non-partitioned thread pool (i.e. `THREAD_POOL_LOCAL_APP`) instead of the partitioned, making tasks across multiple threads auto-balanced to prevent some threads from being starved while others are stuffed. And new parameter is added to restrict the max number of replicas allowed to be loaded simultaneously for each disk, in case that I/O usage for each disk becomes saturated. Another parameter is added to ensure that the main thread waiting all loading tasks to finished would not be blocked on one task too long while the number of tasks for loading replica directories simultaneously on a single disk has reached its limit. Parameters are added as follows: ```diff [replication] + max_replicas_on_load_for_each_disk = 256 + load_replica_max_wait_time_ms = 10 ``` --- .clang-tidy | 2 +- build_tools/clang_tidy.py | 4 +- src/common/replication.codes.h | 2 +- src/meta/meta_service.cpp | 1 + src/replica/replica.h | 1 + src/replica/replica_stub.cpp | 681 +++++++++++++++++------- src/replica/replica_stub.h | 105 +++- src/replica/test/config-test.ini | 8 +- src/replica/test/load_replicas_test.cpp | 339 ++++++++++++ src/replica/test/replica_dir_test.cpp | 127 +++++ src/server/result_writer.cpp | 1 - src/test_util/test_util.h | 9 +- src/utils/autoref_ptr.h | 9 + src/utils/command_manager.h | 42 +- src/utils/time_utils.h | 4 +- 15 files changed, 1100 insertions(+), 235 deletions(-) create mode 100644 src/replica/test/load_replicas_test.cpp create mode 100644 src/replica/test/replica_dir_test.cpp diff --git a/.clang-tidy b/.clang-tidy index 914dee1198..95dd7616d7 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -20,7 +20,7 @@ CheckOptions: [] # Disable some checks that are not useful for us now. # They are sorted by names, and should be consistent to build_tools/clang_tidy.py. -Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter' +Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-bind,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter,-readability-suspicious-call-argument' ExtraArgs: ExtraArgsBefore: [] FormatStyle: none diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py index 09ea434b1d..8a35e1a5dd 100755 --- a/build_tools/clang_tidy.py +++ b/build_tools/clang_tidy.py @@ -88,6 +88,7 @@ def tidy_on_path(path): "-misc-definitions-in-headers," "-misc-non-private-member-variables-in-classes," "-misc-unused-parameters," + "-modernize-avoid-bind," "-modernize-avoid-c-arrays," "-modernize-replace-disallow-copy-and-assign-macro," "-modernize-use-trailing-return-type," @@ -95,7 +96,8 @@ def tidy_on_path(path): "-readability-function-cognitive-complexity," "-readability-identifier-length," "-readability-magic-numbers," - "-readability-named-parameter", + "-readability-named-parameter," + "-readability-suspicious-call-argument", "-extra-arg=-language=c++", "-extra-arg=-std=c++17", "-extra-arg=-Ithirdparty/output/include"] diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h index 7bbb775b17..42ae695f7a 100644 --- a/src/common/replication.codes.h +++ b/src/common/replication.codes.h @@ -141,7 +141,6 @@ MAKE_EVENT_CODE(LPC_META_STATE_NORMAL, TASK_PRIORITY_COMMON) // THREAD_POOL_REPLICATION #define CURRENT_THREAD_POOL THREAD_POOL_REPLICATION -MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON) @@ -186,6 +185,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH) // THREAD_POOL_LOCAL_APP #define CURRENT_THREAD_POOL THREAD_POOL_LOCAL_APP +MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_WRITE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_read_THROTTLING_DELAY, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index bd9736304b..6ec19c007d 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -29,6 +29,7 @@ #include #include // for std::remove_if #include +#include #include #include #include diff --git a/src/replica/replica.h b/src/replica/replica.h index 4ee215c3dc..b312865c5e 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -610,6 +610,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_disk_test; friend class replica_disk_migrate_test; friend class open_replica_test; + friend class mock_load_replica; friend class replica_follower; friend class ::pegasus::server::pegasus_server_test_base; friend class ::pegasus::server::rocksdb_wrapper_test; diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 2e4eec66e3..471b8b1f8c 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -29,15 +29,15 @@ // IWYU pragma: no_include #include #include +#include #include -#include -#include #include #include #include -#include +#include #include #include +#include #include #include #include @@ -71,6 +71,10 @@ #include "security/access_controller.h" #include "split/replica_split_manager.h" #include "task/async_calls.h" +#include "task/task.h" +#include "task/task_engine.h" +#include "task/task_worker.h" +#include "utils/api_utilities.h" #include "utils/command_manager.h" #include "utils/env.h" #include "utils/errors.h" @@ -80,8 +84,11 @@ #include "utils/ports.h" #include "utils/process_utils.h" #include "utils/rand.h" +#include "utils/string_conv.h" #include "utils/strings.h" #include "utils/synchronize.h" +#include "utils/threadpool_spec.h" +#include "utils/timer.h" #ifdef DSN_ENABLE_GPERF #include #elif defined(DSN_USE_JEMALLOC) @@ -91,14 +98,17 @@ #include "remote_cmd/remote_command.h" #include "utils/fail_point.h" -static const char *kMaxConcurrentBulkLoadDownloadingCountDesc = - "The maximum concurrent bulk load downloading replica count"; -DSN_DEFINE_int32(replication, - max_concurrent_bulk_load_downloading_count, - 5, - kMaxConcurrentBulkLoadDownloadingCountDesc); -DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count, - [](int32_t value) -> bool { return value >= 0; }); +namespace { + +const char *kMaxReplicasOnLoadForEachDiskDesc = + "The max number of replicas that are allowed to be loaded simultaneously for each disk dir."; + +const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica loading to complete."; + +const char *kMaxConcurrentBulkLoadDownloadingCountDesc = + "The maximum concurrent bulk load downloading replica count."; + +} // anonymous namespace METRIC_DEFINE_gauge_int64(server, total_replicas, @@ -260,6 +270,26 @@ DSN_DECLARE_string(data_dirs); DSN_DECLARE_string(encryption_cluster_key_name); DSN_DECLARE_string(server_key); +DSN_DEFINE_uint64(replication, + max_replicas_on_load_for_each_disk, + 256, + kMaxReplicasOnLoadForEachDiskDesc); +DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE); +DSN_DEFINE_validator(max_replicas_on_load_for_each_disk, + [](uint64_t value) -> bool { return value > 0; }); + +DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10, kLoadReplicaMaxWaitTimeMsDesc); +DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE); +DSN_DEFINE_validator(load_replica_max_wait_time_ms, + [](uint64_t value) -> bool { return value > 0; }); + +DSN_DEFINE_int32(replication, + max_concurrent_bulk_load_downloading_count, + 5, + kMaxConcurrentBulkLoadDownloadingCountDesc); +DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count, + [](int32_t value) -> bool { return value >= 0; }); + DSN_DEFINE_bool(replication, deny_client_on_start, false, @@ -376,9 +406,52 @@ namespace dsn { namespace replication { bool replica_stub::s_not_exit_on_log_failure = false; +namespace { + +// Register commands that get/set flag configurations. +void register_flags_ctrl_command() +{ + // For the reaonse why using std::call_once please see comments in + // replica_stub::register_ctrl_command() for details. + static std::once_flag flag; + std::call_once(flag, []() mutable { + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_int_command( + FLAGS_max_replicas_on_load_for_each_disk, + FLAGS_max_replicas_on_load_for_each_disk, + "replica.max-replicas-on-load-for-each-disk", + kMaxReplicasOnLoadForEachDiskDesc)); + + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_int_command( + FLAGS_load_replica_max_wait_time_ms, + FLAGS_load_replica_max_wait_time_ms, + "replica.load-replica-max-wait-time-ms", + kLoadReplicaMaxWaitTimeMsDesc)); + + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_bool_command( + FLAGS_empty_write_disabled, + "replica.disable-empty-write", + "whether to disable empty writes")); + + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_int_command( + FLAGS_max_concurrent_bulk_load_downloading_count, + FLAGS_max_concurrent_bulk_load_downloading_count, + "replica.max-concurrent-bulk-load-downloading-count", + kMaxConcurrentBulkLoadDownloadingCountDesc)); + }); +} + +} // anonymous namespace + replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, bool is_long_subscriber /* = true*/) : serverlet("replica_stub"), + _state(NS_Disconnected), + _replica_state_subscriber(std::move(subscriber)), + _is_long_subscriber(is_long_subscriber), _deny_client(false), _verbose_client_log(false), _verbose_commit_log(false), @@ -388,6 +461,9 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _bulk_load_downloading_count(0), _manual_emergency_checkpointing_count(0), _is_running(false), +#ifdef DSN_ENABLE_GPERF + _is_releasing_memory(false), +#endif METRIC_VAR_INIT_server(total_replicas), METRIC_VAR_INIT_server(opening_replicas), METRIC_VAR_INIT_server(closing_replicas), @@ -420,16 +496,13 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms), METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes) { -#ifdef DSN_ENABLE_GPERF - _is_releasing_memory = false; -#endif - _replica_state_subscriber = subscriber; - _is_long_subscriber = is_long_subscriber; - _failure_detector = nullptr; - _state = NS_Disconnected; + // Some flags might need to be tuned on the stage of loading replicas (during + // replica_stub::initialize()), thus register their control command just in the + // constructor. + register_flags_ctrl_command(); } -replica_stub::~replica_stub(void) { close(); } +replica_stub::~replica_stub() { close(); } void replica_stub::initialize(bool clear /* = false*/) { @@ -439,6 +512,246 @@ void replica_stub::initialize(bool clear /* = false*/) _access_controller = std::make_unique(); } +std::vector replica_stub::get_all_disk_dirs() const +{ + std::vector disks; + for (const auto &disk_node : _fs_manager.get_dir_nodes()) { + if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) { + // Skip disks with IO errors. + continue; + } + + std::vector sub_dirs; + CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false), + "failed to get sub_directories in {}", + disk_node->full_dir); + disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)}); + } + + return disks; +} + +// TaskCode: LPC_REPLICATION_INIT_LOAD +// ThreadPool: THREAD_POOL_LOCAL_APP +void replica_stub::load_replica(dir_node *disk_node, + const std::string &replica_dir, + size_t total_dir_count, + utils::ex_lock &reps_lock, + replica_map_by_gpid &reps, + std::atomic &finished_dir_count) +{ + // Measure execution time for loading a replica dir. + // + // TODO(wangdan): support decimal milliseconds or microseconds, since loading a small + // replica tends to spend less than 1 milliseconds and show "0ms" in logging. + SCOPED_LOG_TIMING(INFO, "on loading replica dir {}:{}", disk_node->tag, replica_dir); + + LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, replica_dir); + + const auto *const worker = task::get_current_worker2(); + if (worker != nullptr) { + CHECK(!(worker->pool()->spec().partitioned), + "The thread pool THREAD_POOL_LOCAL_APP(task code: LPC_REPLICATION_INIT_LOAD) " + "for loading replicas must not be partitioned since load balancing is required " + "among multiple threads"); + } + + auto rep = load_replica(disk_node, replica_dir); + if (rep == nullptr) { + LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}", + disk_node->tag, + replica_dir, + ++finished_dir_count, + total_dir_count); + return; + } + + LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{}, progress={}/{}, " + "last_durable_decree={}, last_committed_decree={}, last_prepared_decree={}", + rep->get_gpid(), + dsn_primary_host_port(), + disk_node->tag, + replica_dir, + ++finished_dir_count, + total_dir_count, + rep->last_durable_decree(), + rep->last_committed_decree(), + rep->last_prepared_decree()); + + utils::auto_lock l(reps_lock); + const auto rep_iter = reps.find(rep->get_gpid()); + CHECK(rep_iter == reps.end(), + "{}@{}: newly loaded dir {} conflicts with existing {} while loading replica", + rep->get_gpid(), + dsn_primary_host_port(), + rep->dir(), + rep_iter->second->dir()); + + reps.emplace(rep->get_gpid(), rep); +} + +void replica_stub::load_replicas(replica_map_by_gpid &reps) +{ + // Measure execution time for loading all replicas from all healthy disks without IO errors. + // + // TODO(wangdan): show both the size of output replicas and execution time on just one + // logging line. + SCOPED_LOG_TIMING(INFO, "on loading replicas"); + + const auto &disks = get_all_disk_dirs(); + + // The max index of dirs that are currently being loaded for each disk, which means the dirs + // with higher indexes have not begun to be loaded (namely pushed into the queue). + std::vector replica_dir_indexes(disks.size(), 0); + + // Each loader is for a replica dir, including its path and loading task. + struct replica_dir_loader + { + size_t replica_dir_index; + std::string replica_dir_path; + task_ptr load_replica_task; + }; + + // Each queue would cache the tasks that loading dirs for each disk. Once the task is + // found finished (namely a dir has been loaded successfully), it would be popped from + // the queue. + std::vector> load_disk_queues(disks.size()); + + // The number of loading replica dirs that have been finished for each disk, used to show + // current progress. + // + // TODO(wangdan): calculate the number of successful or failed loading of replica dirs, + // and the number for each reason if failed. + std::vector> finished_replica_dirs(disks.size()); + for (auto &count : finished_replica_dirs) { + count.store(0); + } + + // The lock for operations on the loaded replicas as output. + utils::ex_lock reps_lock; + + while (true) { + size_t finished_disks = 0; + + // For each round, start loading one replica for each disk in case there are too many + // replicas in a disk, except that all of the replicas of this disk are being loaded. + for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) { + // TODO(wangdan): Structured bindings can be captured by closures in g++, while + // not supported well by clang. Thus we do not use following statement to bind + // both variables until clang has been upgraded to version 16 which could support + // that well: + // + // const auto &[disk_node, replica_dirs] = disks[disk_index]; + // + // For the docs of clang 16 please see: + // + // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support. + const auto &replica_dirs = disks[disk_index].replica_dirs; + + auto &replica_dir_index = replica_dir_indexes[disk_index]; + if (replica_dir_index >= replica_dirs.size()) { + // All of the replicas for the disk `disks[disk_index]` have begun to be loaded, + // thus just skip to next disk. + ++finished_disks; + continue; + } + + const auto &disk_node = disks[disk_index].disk_node; + auto &load_disk_queue = load_disk_queues[disk_index]; + if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { + // Loading replicas should be throttled in case that disk IO is saturated. + if (!load_disk_queue.front().load_replica_task->wait( + static_cast(FLAGS_load_replica_max_wait_time_ms))) { + // There might be too many replicas that are being loaded which lead to + // slow disk IO, thus turn to load replicas of next disk, and try to load + // dir `replica_dir_index` of this disk in the next round. + LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still not finished, " + "there are {} replicas being loaded for disk({}:{}, {}/{}), " + "now turn to next disk, and will begin to load dir({}, {}/{}) " + "soon", + FLAGS_load_replica_max_wait_time_ms, + load_disk_queue.front().replica_dir_path, + load_disk_queue.front().replica_dir_index, + replica_dirs.size(), + load_disk_queue.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size(), + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size()); + continue; + } + + // Now the queue size is within the limit again, continue to load a new replica dir. + load_disk_queue.pop(); + } + + if (dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) { + LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})", + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size()); + ++replica_dir_index; + continue; + } + + LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})", + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size()); + + load_disk_queue.push(replica_dir_loader{ + replica_dir_index, + replica_dirs[replica_dir_index], + tasking::create_task( + // Ensure that the thread pool is non-partitioned. + LPC_REPLICATION_INIT_LOAD, + &_tracker, + std::bind(static_cast &)>( + &replica_stub::load_replica), + this, + disk_node, + replica_dirs[replica_dir_index], + replica_dirs.size(), + std::ref(reps_lock), + std::ref(reps), + std::ref(finished_replica_dirs[disk_index])))}); + + load_disk_queue.back().load_replica_task->enqueue(); + + ++replica_dir_index; + } + + if (finished_disks >= disks.size()) { + // All replicas of all disks have begun to be loaded. + break; + } + } + + // All loading tasks have been in the queue. Just wait all tasks to be finished. + for (auto &load_disk_queue : load_disk_queues) { + while (!load_disk_queue.empty()) { + CHECK_TRUE(load_disk_queue.front().load_replica_task->wait()); + load_disk_queue.pop(); + } + } +} + void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/) { _primary_host_port = dsn_primary_host_port(); @@ -523,75 +836,14 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f // Start to load replicas in available data directories. LOG_INFO("start to load replicas"); - std::map> dirs_by_dn; - for (const auto &dn : _fs_manager.get_dir_nodes()) { - // Skip IO error dir_node. - if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) { - continue; - } - std::vector sub_directories; - CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false), - "fail to get sub_directories in {}", - dn->full_dir); - dirs_by_dn.emplace(dn.get(), sub_directories); - } - - replicas rps; - utils::ex_lock rps_lock; - std::deque load_tasks; - uint64_t start_time = dsn_now_ms(); - for (const auto &dn_dirs : dirs_by_dn) { - const auto dn = dn_dirs.first; - for (const auto &dir : dn_dirs.second) { - if (dsn::replication::is_data_dir_invalid(dir)) { - LOG_WARNING("ignore dir {}", dir); - continue; - } - load_tasks.push_back(tasking::create_task( - LPC_REPLICATION_INIT_LOAD, - &_tracker, - [this, dn, dir, &rps, &rps_lock] { - LOG_INFO("process dir {}", dir); - - auto r = load_replica(dn, dir.c_str()); - if (r == nullptr) { - return; - } - LOG_INFO("{}@{}: load replica '{}' success, = <{}, {}>, last_prepared_decree = {}", - r->get_gpid(), - dsn_primary_host_port(), - dir, - r->last_durable_decree(), - r->last_committed_decree(), - r->last_prepared_decree()); - - utils::auto_lock l(rps_lock); - CHECK(rps.find(r->get_gpid()) == rps.end(), - "conflict replica dir: {} <--> {}", - r->dir(), - rps[r->get_gpid()]->dir()); - - rps[r->get_gpid()] = r; - }, - load_tasks.size())); - load_tasks.back()->enqueue(); - } - } - for (auto &tsk : load_tasks) { - tsk->wait(); - } - uint64_t finish_time = dsn_now_ms(); + replica_map_by_gpid reps; + load_replicas(reps); - dirs_by_dn.clear(); - load_tasks.clear(); - LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms", - rps.size(), - finish_time - start_time); + LOG_INFO("load replicas succeed, replica_count = {}", reps.size()); bool is_log_complete = true; - for (auto it = rps.begin(); it != rps.end(); ++it) { + for (auto it = reps.begin(); it != reps.end(); ++it) { CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed"); it->second->reset_prepare_list_after_replay(); @@ -621,8 +873,8 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f if (!is_log_complete) { LOG_ERROR("logs are not complete for some replicas, which means that shared log is " "truncated, mark all replicas as inactive"); - for (auto it = rps.begin(); it != rps.end(); ++it) { - it->second->set_inactive_state_transient(false); + for (auto &[_, rep] : reps) { + rep->set_inactive_state_transient(false); } } @@ -648,11 +900,11 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f std::chrono::seconds(FLAGS_disk_stat_interval_seconds)); } - // attach rps - _replicas = std::move(rps); + // Attach `reps`. + _replicas = std::move(reps); METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size()); - for (const auto &kv : _replicas) { - _fs_manager.add_replica(kv.first, kv.second->dir()); + for (const auto &[pid, rep] : _replicas) { + _fs_manager.add_replica(pid, rep->dir()); } _nfs = dsn::nfs_node::create(); @@ -746,7 +998,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id) { LOG_INFO("kill replica: gpid = {}", id); if (id.get_app_id() == -1 || id.get_partition_index() == -1) { - replicas rs; + replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; @@ -1373,27 +1625,28 @@ void replica_stub::on_node_query_reply(error_code err, resp.partitions.size(), resp.gc_replicas.size()); - replicas rs; + replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); - rs = _replicas; + reps = _replicas; } - for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) { - rs.erase(it->config.pid); - tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER, - &_tracker, - std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it), - it->config.pid.thread_hash()); + for (const auto &config_update : resp.partitions) { + reps.erase(config_update.config.pid); + tasking::enqueue( + LPC_QUERY_NODE_CONFIGURATION_SCATTER, + &_tracker, + std::bind(&replica_stub::on_node_query_reply_scatter, this, this, config_update), + config_update.config.pid.thread_hash()); } - // for rps not exist on meta_servers - for (auto it = rs.begin(); it != rs.end(); ++it) { + // For the replicas that do not exist on meta_servers. + for (const auto &[pid, _] : reps) { tasking::enqueue( LPC_QUERY_NODE_CONFIGURATION_SCATTER2, &_tracker, - std::bind(&replica_stub::on_node_query_reply_scatter2, this, this, it->first), - it->first.thread_hash()); + std::bind(&replica_stub::on_node_query_reply_scatter2, this, this, pid), + pid.thread_hash()); } // handle the replicas which need to be gc @@ -1522,18 +1775,18 @@ void replica_stub::on_meta_server_disconnected() _state = NS_Disconnected; - replicas rs; + replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); - rs = _replicas; + reps = _replicas; } - for (auto it = rs.begin(); it != rs.end(); ++it) { + for (const auto &[pid, _] : reps) { tasking::enqueue( LPC_CM_DISCONNECTED_SCATTER, &_tracker, - std::bind(&replica_stub::on_meta_server_disconnected_scatter, this, this, it->first), - it->first.thread_hash()); + std::bind(&replica_stub::on_meta_server_disconnected_scatter, this, this, pid), + pid.thread_hash()); } } @@ -1823,7 +2076,7 @@ void replica_stub::open_replica( _primary_host_port_cache, group_check ? "with" : "without", dir); - rep = load_replica(dn, dir.c_str()); + rep = load_replica(dn, dir); // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk // migration @@ -1848,7 +2101,7 @@ void replica_stub::open_replica( boost::replace_first( origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, ""); dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir); - rep = load_replica(origin_dn, origin_dir.c_str()); + rep = load_replica(origin_dn, origin_dir); FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> void {}); } @@ -1915,7 +2168,7 @@ void replica_stub::open_replica( METRIC_VAR_DECREMENT(opening_replicas); CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in _replicas", id); - _replicas.insert(replicas::value_type(rep->get_gpid(), rep)); + _replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(), rep)); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); @@ -1981,6 +2234,51 @@ replica *replica_stub::new_replica(gpid gpid, return rep; } +replica *replica_stub::new_replica(gpid gpid, + const app_info &app, + bool restore_if_necessary, + bool is_duplication_follower) +{ + return new_replica(gpid, app, restore_if_necessary, is_duplication_follower, {}); +} + +/*static*/ std::string replica_stub::get_replica_dir_name(const std::string &dir) +{ + static const char splitters[] = {'\\', '/', 0}; + return utils::get_last_component(dir, splitters); +} + +/* static */ bool +replica_stub::parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type) +{ + std::vector ids(2, 0); + size_t begin = 0; + for (auto &id : ids) { + size_t end = dir_name.find('.', begin); + if (end == std::string::npos) { + return false; + } + + if (!buf2uint32(std::string_view(dir_name.data() + begin, end - begin), id)) { + return false; + } + + begin = end + 1; + } + + if (begin >= dir_name.size()) { + return false; + } + + pid.set_app_id(static_cast(ids[0])); + pid.set_partition_index(static_cast(ids[1])); + + // TODO(wangdan): the 3rd parameter `count` does not support default argument for CentOS 7 + // (gcc 7.3.1). After CentOS 7 is deprecated, consider dropping std::string::npos. + app_type.assign(dir_name, begin, std::string::npos); + return true; +} + bool replica_stub::validate_replica_dir(const std::string &dir, app_info &ai, gpid &pid, @@ -1991,21 +2289,18 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return false; } - char splitters[] = {'\\', '/', 0}; - const auto name = utils::get_last_component(dir, splitters); - if (name.empty()) { + const auto &dir_name = get_replica_dir_name(dir); + if (dir_name.empty()) { hint_message = fmt::format("invalid replica dir '{}'", dir); return false; } - char app_type[128] = {0}; - int32_t app_id, pidx; - if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { + std::string app_type; + if (!parse_replica_dir_name(dir_name, pid, app_type)) { hint_message = fmt::format("invalid replica dir '{}'", dir); return false; } - pid = gpid(app_id, pidx); replica_app_info rai(&ai); const auto ai_path = utils::filesystem::path_combine(dir, replica_app_info::kAppInfo); const auto err = rai.load(ai_path); @@ -2019,10 +2314,9 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return false; } - // When the online partition split function aborted, the garbage partitions are with pidx in - // the range of [ai.partition_count, 2 * ai.partition_count), which means the partitions with - // pidx >= ai.partition_count are garbage partitions. - if (ai.partition_count <= pidx) { + if (pid.get_partition_index() >= ai.partition_count) { + // Once the online partition split aborted, the partitions within the range of + // [ai.partition_count, 2 * ai.partition_count) would become garbage. hint_message = fmt::format( "partition[{}], count={}, this replica may be partition split garbage partition, " "ignore it", @@ -2034,7 +2328,7 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return true; } -replica *replica_stub::load_replica(dir_node *dn, const char *dir) +replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir) { FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> replica * { return nullptr; }); @@ -2042,24 +2336,28 @@ replica *replica_stub::load_replica(dir_node *dn, const char *dir) app_info ai; gpid pid; std::string hint_message; - if (!validate_replica_dir(dir, ai, pid, hint_message)) { - LOG_ERROR("invalid replica dir '{}', hint: {}", dir, hint_message); + if (!validate_replica_dir(replica_dir, ai, pid, hint_message)) { + LOG_ERROR("invalid replica dir '{}', hint={}", replica_dir, hint_message); return nullptr; } // The replica's directory must exist when creating a replica. - CHECK_EQ(dir, dn->replica_dir(ai.app_type, pid)); - auto *rep = new replica(this, pid, ai, dn, false); + CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir); + + auto *rep = new replica(this, pid, ai, disk_node, false); const auto err = rep->initialize_on_load(); if (err != ERR_OK) { - LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err); - rep->close(); + LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}", + rep->name(), + disk_node->tag, + replica_dir, + err); delete rep; rep = nullptr; // clear work on failure - if (dsn::utils::filesystem::directory_exists(dir)) { - move_to_err_path(dir, "load replica"); + if (dsn::utils::filesystem::directory_exists(replica_dir)) { + move_to_err_path(replica_dir, "load replica"); METRIC_VAR_INCREMENT(moved_error_replicas); _fs_manager.remove_replica(pid); } @@ -2067,7 +2365,10 @@ replica *replica_stub::load_replica(dir_node *dn, const char *dir) return nullptr; } - LOG_INFO("{}: load replica succeed", rep->name()); + LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}", + rep->name(), + disk_node->tag, + replica_dir); return rep; } @@ -2394,11 +2695,6 @@ void replica_stub::register_ctrl_command() }); })); - _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( - FLAGS_empty_write_disabled, - "replica.disable-empty-write", - "whether to disable empty writes")); - #ifdef DSN_ENABLE_GPERF _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _release_tcmalloc_memory, @@ -2433,61 +2729,64 @@ void replica_stub::register_ctrl_command() #elif defined(DSN_USE_JEMALLOC) register_jemalloc_ctrl_command(); #endif - _cmds.emplace_back(::dsn::command_manager::instance().register_int_command( - FLAGS_max_concurrent_bulk_load_downloading_count, - FLAGS_max_concurrent_bulk_load_downloading_count, - "replica.max-concurrent-bulk-load-downloading-count", - kMaxConcurrentBulkLoadDownloadingCountDesc)); }); } std::string -replica_stub::exec_command_on_replica(const std::vector &args, +replica_stub::exec_command_on_replica(const std::vector &arg_str_list, bool allow_empty_args, - std::function func) + std::function func) { - if (args.empty() && !allow_empty_args) { - return std::string("invalid arguments"); + static const std::string kInvalidArguments("invalid arguments"); + + if (arg_str_list.empty() && !allow_empty_args) { + return kInvalidArguments; } - replicas rs; + replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; } std::set required_ids; - replicas choosed_rs; - if (!args.empty()) { - for (int i = 0; i < args.size(); i++) { - std::vector arg_strs; - utils::split_args(args[i].c_str(), arg_strs, ','); - if (arg_strs.empty()) { - return std::string("invalid arguments"); + replica_map_by_gpid choosed_rs; + if (!arg_str_list.empty()) { + for (const auto &arg_str : arg_str_list) { + std::vector args; + utils::split_args(arg_str.c_str(), args, ','); + if (args.empty()) { + return kInvalidArguments; } - for (const std::string &arg : arg_strs) { - if (arg.empty()) + for (const std::string &arg : args) { + if (arg.empty()) { continue; + } + gpid id; - int pid; if (id.parse_from(arg.c_str())) { - // app_id.partition_index + // Format: app_id.partition_index required_ids.insert(id); auto find = rs.find(id); if (find != rs.end()) { choosed_rs[id] = find->second; } - } else if (sscanf(arg.c_str(), "%d", &pid) == 1) { - // app_id - for (auto kv : rs) { - id = kv.second->get_gpid(); - if (id.get_app_id() == pid) { - choosed_rs[id] = kv.second; - } + + continue; + } + + // Must be app_id. + int32_t app_id = 0; + if (!buf2int32(arg, app_id)) { + return kInvalidArguments; + } + + for (const auto &[_, rep] : rs) { + id = rep->get_gpid(); + if (id.get_app_id() == app_id) { + choosed_rs[id] = rep; } - } else { - return std::string("invalid arguments"); } } } @@ -2507,8 +2806,10 @@ replica_stub::exec_command_on_replica(const std::vector &args, [rep, &func, &results_lock, &results]() { partition_status::type status = rep->status(); if (status != partition_status::PS_PRIMARY && - status != partition_status::PS_SECONDARY) + status != partition_status::PS_SECONDARY) { return; + } + std::string result = func(rep); ::dsn::zauto_lock l(results_lock); auto &value = results[rep->get_gpid()]; @@ -2720,33 +3021,39 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, CHECK_NOTNULL(dn, ""); auto *rep = new replica(this, child_pid, *app, dn, false); rep->_config.status = partition_status::PS_INACTIVE; - _replicas.insert(replicas::value_type(child_pid, rep)); + _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); LOG_INFO("mock create_child_replica_if_not_found succeed"); return rep; }); zauto_write_lock l(_replicas_lock); - auto it = _replicas.find(child_pid); + + const auto it = _replicas.find(child_pid); if (it != _replicas.end()) { return it->second; - } else { - if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { - LOG_WARNING("failed create child replica({}) because it is under open", child_pid); - return nullptr; - } else if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { - LOG_WARNING("failed create child replica({}) because it is under close", child_pid); - return nullptr; - } else { - replica *rep = new_replica(child_pid, *app, false, false, parent_dir); - if (rep != nullptr) { - auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); - CHECK(pr.second, "child replica {} has been existed", rep->name()); - METRIC_VAR_INCREMENT(total_replicas); - _closed_replicas.erase(child_pid); - } - return rep; - } } + + if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { + LOG_WARNING("failed create child replica({}) because it is under open", child_pid); + return nullptr; + } + + if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { + LOG_WARNING("failed create child replica({}) because it is under close", child_pid); + return nullptr; + } + + replica *rep = new_replica(child_pid, *app, false, false, parent_dir); + if (rep == nullptr) { + return nullptr; + } + + const auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); + CHECK(pr.second, "child replica {} has been existed", rep->name()); + METRIC_VAR_INCREMENT(total_replicas); + _closed_replicas.erase(child_pid); + + return rep; } // ThreadPool: THREAD_POOL_REPLICATION diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index d3b8600764..7d03d06ee2 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -27,8 +27,9 @@ #pragma once #include -#include #include +#include +#include #include #include #include @@ -70,6 +71,12 @@ #include "utils/metrics.h" #include "utils/zlocks.h" +namespace dsn::utils { + +class ex_lock; + +} // namespace dsn::utils + DSN_DECLARE_uint32(max_concurrent_manual_emergency_checkpointing_count); namespace dsn { @@ -112,14 +119,14 @@ typedef rpc_holder add_new_disk_rpc namespace test { class test_checker; -} +} // namespace test + class cold_backup_context; class replica_split_manager; -typedef std::function - replica_state_subscriber; -typedef std::unordered_map replicas; + +using replica_state_subscriber = std::function; class replica_stub; @@ -222,9 +229,9 @@ class replica_stub : public serverlet, public ref_counter // - if allow_empty_args = false, you should specify at least one argument. // each argument should be in format of: // id1,id2... (where id is 'app_id' or 'app_id.partition_id') - std::string exec_command_on_replica(const std::vector &args, + std::string exec_command_on_replica(const std::vector &arg_str_list, bool allow_empty_args, - std::function func); + std::function func); // // partition split @@ -351,15 +358,61 @@ class replica_stub : public serverlet, public ref_counter gpid id, const std::shared_ptr &req, const std::shared_ptr &req2); - // Create a new replica according to the parameters. - // 'parent_dir' is used in partition split for create_child_replica_dir(). + + // Create a child replica for partition split, with 'parent_dir' specified as the parent + // replica dir used for `create_child_replica_dir()`. replica *new_replica(gpid gpid, const app_info &app, bool restore_if_necessary, bool is_duplication_follower, - const std::string &parent_dir = ""); - // Load an existing replica which is located in 'dn' with 'dir' directory. - replica *load_replica(dir_node *dn, const char *dir); + const std::string &parent_dir); + + // Create a new replica, choosing and assigning the best dir for it. + replica *new_replica(gpid gpid, + const app_info &app, + bool restore_if_necessary, + bool is_duplication_follower); + + // Each disk with its candidate replica dirs, used to load replicas while initializing. + struct disk_replicas_info + { + // `dir_node` for each disk. + dir_node *disk_node; + + // All replica dirs on each disk. + std::vector replica_dirs; + }; + + // Get the absolute dirs of all replicas for all healthy disks without IO errors. + std::vector get_all_disk_dirs() const; + + // Get the replica dir name from a potentially longer path (`dir` could be an absolute + // or relative path). + static std::string get_replica_dir_name(const std::string &dir); + + // Parse app id, partition id and app type from the replica dir name. + static bool + parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type); + + // Load an existing replica which is located in `dn` with `replica_dir`. Usually each + // different `dn` represents a unique disk. `replica_dir` is the absolute path of the + // directory for a replica. + virtual replica_ptr load_replica(dir_node *disk_node, const std::string &replica_dir); + + using replica_map_by_gpid = std::unordered_map; + + // The same as the above `load_replica` function, except that this function is to load + // each replica to `reps` with protection from `reps_lock`. + void load_replica(dir_node *disk_node, + const std::string &replica_dir, + size_t total_dir_count, + utils::ex_lock &reps_lock, + replica_map_by_gpid &reps, + std::atomic &finished_dir_count); + + // Load all replicas simultaneously from all disks to `reps`. + void load_replicas(replica_map_by_gpid &reps); + // Clean up the memory state and on disk data if creating replica failed. void clear_on_failure(replica *rep); task_ptr begin_close_replica(replica_ptr r); @@ -445,21 +498,26 @@ class replica_stub : public serverlet, public ref_counter friend class replica_follower; friend class replica_follower_test; friend class replica_http_service_test; + friend class mock_load_replica; + friend class GetReplicaDirNameTest; + friend class ParseReplicaDirNameTest; FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check); FRIEND_TEST(replica_test, test_auto_trash_of_corruption); FRIEND_TEST(replica_test, test_clear_on_failure); - typedef std::unordered_map opening_replicas; - typedef std::unordered_map> - closing_replicas; // > - typedef std::map> - closed_replicas; // > + using opening_replica_map_by_gpid = std::unordered_map; + + // `task_ptr` is the task closing a replica. + using closing_replica_map_by_gpid = + std::unordered_map>; + + using closed_replica_map_by_gpid = std::map>; mutable zrwlock_nr _replicas_lock; - replicas _replicas; - opening_replicas _opening_replicas; - closing_replicas _closing_replicas; - closed_replicas _closed_replicas; + replica_map_by_gpid _replicas; + opening_replica_map_by_gpid _opening_replicas; + closing_replica_map_by_gpid _closing_replicas; + closed_replica_map_by_gpid _closed_replicas; ::dsn::host_port _primary_host_port; // The stringify of '_primary_host_port', used by logging usually. @@ -565,5 +623,6 @@ class replica_stub : public serverlet, public ref_counter dsn::task_tracker _tracker; }; + } // namespace replication } // namespace dsn diff --git a/src/replica/test/config-test.ini b/src/replica/test/config-test.ini index 2d2c6f3974..b2e0e86c2f 100644 --- a/src/replica/test/config-test.ini +++ b/src/replica/test/config-test.ini @@ -34,7 +34,7 @@ type = replica run = true count = 1 ports = 54321 -pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE +pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_LOCAL_APP,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE [core] ;tool = simulator @@ -76,6 +76,12 @@ partitioned = true worker_priority = THREAD_xPRIORITY_NORMAL worker_count = 3 +[threadpool.THREAD_POOL_LOCAL_APP] +name = local_app +partitioned = false +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 4 + [threadpool.THREAD_POOL_REPLICATION_LONG] name = replica_long diff --git a/src/replica/test/load_replicas_test.cpp b/src/replica/test/load_replicas_test.cpp new file mode 100644 index 0000000000..3315629c93 --- /dev/null +++ b/src/replica/test/load_replicas_test.cpp @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/fs_manager.h" +#include "common/gpid.h" +#include "common/replication.codes.h" +#include "dsn.layer2_types.h" +#include "gtest/gtest.h" +#include "replica/replica.h" +#include "replica/replica_stub.h" +#include "replica/replication_app_base.h" +#include "replica/test/mock_utils.h" +#include "task/task.h" +#include "task/task_code.h" +#include "task/task_spec.h" +#include "test_util/test_util.h" +#include "utils/autoref_ptr.h" +#include "utils/filesystem.h" +#include "utils/flags.h" +#include "utils/ports.h" + +DSN_DECLARE_uint64(max_replicas_on_load_for_each_disk); + +namespace dsn::replication { + +class mock_load_replica : public replica_stub +{ +public: + mock_load_replica() = default; + + ~mock_load_replica() override = default; + + void initialize(const std::map &dirs_by_tag, + const std::map> &replicas_by_tag) + { + // Get dirs and tags to initialize fs_manager. + std::vector dirs; + std::vector tags; + for (const auto &[tag, dir] : dirs_by_tag) { + dirs.push_back(dir); + tags.push_back(tag); + } + + // Initialize fs_manager by the tag and dir of each disk. + _fs_manager.initialize(dirs, tags); + + // Generate the replicas which are expected after loading. + for (const auto &[tag, reps] : replicas_by_tag) { + for (const auto &pid : reps) { + ASSERT_TRUE(_expected_loaded_replica_pids.insert(pid).second); + } + } + + _disk_tags_for_order.clear(); + _disk_dirs_for_order.clear(); + _disk_replicas_for_order.clear(); + _disk_loaded_replicas_for_order.assign(replicas_by_tag.size(), 0); + + // Ensure that the disks are scanned in the order returned by `get_dir_node()`. + for (const auto &dn : _fs_manager.get_dir_nodes()) { + for (const auto &pid : replicas_by_tag.at(dn->tag)) { + _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), "pegasus", pid); + } + + _disk_tags_for_order.push_back(dn->tag); + _disk_dirs_for_order.push_back(dn->full_dir); + _disk_replicas_for_order.push_back(replicas_by_tag.at(dn->tag).size()); + } + + ASSERT_EQ(_disk_tags_for_order.size(), _disk_dirs_for_order.size()); + } + + void test_load_replicas(bool test_load_order, uint64_t max_replicas_on_load_for_each_disk) + { + PRESERVE_VAR(allow_inline, dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline); + dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline = test_load_order; + + PRESERVE_FLAG(max_replicas_on_load_for_each_disk); + FLAGS_max_replicas_on_load_for_each_disk = max_replicas_on_load_for_each_disk; + + // Check if all loaded replicas are matched. + replica_stub::replica_map_by_gpid actual_loaded_replicas; + load_replicas(actual_loaded_replicas); + ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas); + + // Check if all replicas have been loaded. + std::set actual_loaded_replica_pids; + for (const auto &[pid, _] : actual_loaded_replicas) { + ASSERT_TRUE(actual_loaded_replica_pids.insert(pid).second); + } + ASSERT_EQ(_expected_loaded_replica_pids, actual_loaded_replica_pids); + } + + void remove_disk_dirs() + { + for (const auto &dn : _fs_manager.get_dir_nodes()) { + ASSERT_TRUE(utils::filesystem::remove_path(dn->full_dir)); + } + } + +private: + void load_replica_for_test(dir_node *dn, const std::string &replica_dir, replica_ptr &rep) + { + ASSERT_TRUE(utils::filesystem::directory_exists(replica_dir)); + + const auto &dir_name = get_replica_dir_name(replica_dir); + + gpid pid; + std::string app_type; + ASSERT_TRUE(parse_replica_dir_name(dir_name, pid, app_type)); + ASSERT_STREQ("pegasus", app_type.c_str()); + + ASSERT_EQ(LPC_REPLICATION_INIT_LOAD, task::get_current_task()->spec().code); + + if (task::get_current_task()->spec().allow_inline) { + // Once the task is `allow_inline`, it would be executed in place immediately rather + // than pushed into the queue. Thus we could test the expected order in which the + // tasks are pushed into the queue. + + // Find the first disk where there is still some replica that has not been loaded. + size_t finished_disks = 0; + while (_disk_loaded_replicas_for_order[_disk_index_for_order] >= + _disk_replicas_for_order[_disk_index_for_order]) { + // Since current task has not been executed, it is not possible that all disks + // are finished. + ++finished_disks; + ASSERT_GT(_disk_tags_for_order.size(), finished_disks); + + // Skip to next disk since all of the replicas of this disk have been loaded. + _disk_index_for_order = (_disk_index_for_order + 1) % _disk_tags_for_order.size(); + } + + // Only check if the processed order of the disk the replica belongs to, rather than + // the order of the replica itself, for the reason that the order of the dirs returned + // by the underlying call might vary. + ASSERT_EQ(_disk_tags_for_order[_disk_index_for_order], dn->tag); + ASSERT_EQ(_disk_dirs_for_order[_disk_index_for_order], dn->full_dir); + + // Current replica has been loaded, move forward to the next replica of this disk. + ++_disk_loaded_replicas_for_order[_disk_index_for_order]; + + // Turn to next disks if some of them still have some replicas that are not loaded. + _disk_index_for_order = (_disk_index_for_order + 1) % _disk_tags_for_order.size(); + } + + // Check the absolute dir of this replica. + ASSERT_EQ(dn->replica_dir("pegasus", pid), replica_dir); + + app_info ai; + ai.app_type = "pegasus"; + rep = new replica(this, pid, ai, dn, false); + rep->_app = std::make_unique(rep); + + std::lock_guard guard(_mtx); + + ASSERT_TRUE(_expected_loaded_replicas.find(pid) == _expected_loaded_replicas.end()); + + _expected_loaded_replicas[pid] = rep; + } + + // Mock the process of loading a replica. + replica_ptr load_replica(dir_node *dn, const std::string &replica_dir) override + { + replica_ptr rep; + load_replica_for_test(dn, replica_dir, rep); + return rep; + } + + std::set _expected_loaded_replica_pids; + + // The variables with postfix `_for_order` are only for testing the order of the loading + // tasks. + size_t _disk_index_for_order{0}; + std::vector _disk_tags_for_order; + std::vector _disk_dirs_for_order; + std::vector _disk_replicas_for_order; + std::vector _disk_loaded_replicas_for_order; + + mutable std::mutex _mtx; + replica_stub::replica_map_by_gpid _expected_loaded_replicas; + + DISALLOW_COPY_AND_ASSIGN(mock_load_replica); + DISALLOW_MOVE_AND_ASSIGN(mock_load_replica); +}; + +struct load_replicas_case +{ + // Each disk tag => dir of this disk. + std::map dirs_by_tag; + + // Each disk tag => replicas (specified by ) on this disk. + std::map> replicas_by_tag; +}; + +class LoadReplicasTest : public testing::TestWithParam +{ +public: + LoadReplicasTest() + { + // Remove all dirs of all disks to prevent each test from being disturbed. + _stub.remove_disk_dirs(); + + // Use test cases to initialize the replica stub. + const auto &load_case = GetParam(); + _stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag); + } + + ~LoadReplicasTest() override { _stub.remove_disk_dirs(); } + + void test_load_replicas(bool test_load_order, uint64_t max_replicas_on_load_for_each_disk) + { + _stub.test_load_replicas(test_load_order, max_replicas_on_load_for_each_disk); + } + +private: + mock_load_replica _stub; + + DISALLOW_COPY_AND_ASSIGN(LoadReplicasTest); + DISALLOW_MOVE_AND_ASSIGN(LoadReplicasTest); +}; + +TEST_P(LoadReplicasTest, LoadReplicas) { test_load_replicas(false, 256); } + +TEST_P(LoadReplicasTest, LoadOrder) { test_load_replicas(true, 256); } + +TEST_P(LoadReplicasTest, LoadThrottling) { test_load_replicas(false, 5); } + +// Generate a test case for loading replicas. Each element in `disk_replicas` is corresponding +// to the number of replicas on a disk. +load_replicas_case generate_load_replicas_case(const std::vector &disk_replicas) +{ + std::map dirs_by_tag; + for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) { + dirs_by_tag.emplace(fmt::format("data{}", disk_index), fmt::format("disk{}", disk_index)); + } + + static const uint32_t kNumBitsPartitions = 3; + static const uint32_t kNumPartitions = 1U << kNumBitsPartitions; + + uint32_t app_id = 1; + uint32_t partition_id = 0; + + std::map> replicas_by_tag; + + while (true) { + size_t finished_disks = 0; + + for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) { + auto &replica_list = replicas_by_tag[fmt::format("data{}", disk_index)]; + if (replica_list.size() >= disk_replicas[disk_index]) { + // All replicas on this disk have been generated, just skip to next disk. + ++finished_disks; + continue; + } + + // Generate a replica with current app id and partition index. + replica_list.emplace_back(static_cast(app_id), + static_cast(partition_id)); + + // Once next partition index is found 0, increment app id to turn to next table. + app_id += ((partition_id + 1) & kNumPartitions) >> kNumBitsPartitions; + + // Increment index to turn to next partition. + partition_id = (partition_id + 1) & (kNumPartitions - 1); + } + + if (finished_disks >= disk_replicas.size()) { + // All disks have been done. + break; + } + } + + return {dirs_by_tag, replicas_by_tag}; +} + +std::vector generate_load_replicas_cases() +{ + // At least 1 disk should be included (otherwise it would lead to core dump), thus do + // not generate the empty case (i.e. {}). + return std::vector({ + // There is only one disk which has none of replica. + generate_load_replicas_case({0}), + // There are two disks both of which have none of replica. + generate_load_replicas_case({0, 0}), + // There is only one disk which has one replica. + generate_load_replicas_case({1}), + // There are two disks one of which has one replica, and another has none. + generate_load_replicas_case({1, 0}), + generate_load_replicas_case({0, 1}), + // There is only one disk which has two replicas. + generate_load_replicas_case({2}), + // There are two disks one of which has two replicas, and another has none. + generate_load_replicas_case({2, 0}), + generate_load_replicas_case({0, 2}), + // There are at least three disks. + generate_load_replicas_case({1, 0, 2}), + generate_load_replicas_case({8, 25, 16}), + generate_load_replicas_case({17, 96, 56, 127}), + generate_load_replicas_case({22, 38, 0, 16}), + generate_load_replicas_case({82, 75, 36, 118, 65}), + generate_load_replicas_case({0, 92, 17, 68, 25}), + // There are many replicas for some disks. + generate_load_replicas_case({156, 367, 309, 58, 404, 298, 512, 82}), + generate_load_replicas_case({167, 28, 898, 516, 389, 422, 682, 265, 596}), + }); +} + +INSTANTIATE_TEST_SUITE_P(ReplicaStubTest, + LoadReplicasTest, + testing::ValuesIn(generate_load_replicas_cases())); + +} // namespace dsn::replication diff --git a/src/replica/test/replica_dir_test.cpp b/src/replica/test/replica_dir_test.cpp new file mode 100644 index 0000000000..3bc64d74f8 --- /dev/null +++ b/src/replica/test/replica_dir_test.cpp @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/gpid.h" +#include "gtest/gtest.h" +#include "replica/replica_stub.h" + +namespace dsn::replication { + +struct get_replica_dir_name_case +{ + std::string path; + std::string expected_replica_dir_name; +}; + +class GetReplicaDirNameTest : public testing::TestWithParam +{ +public: + static void test_get_replica_dir_name() + { + const auto &test_case = GetParam(); + const auto &actual_replica_dir_name = replica_stub::get_replica_dir_name(test_case.path); + EXPECT_EQ(test_case.expected_replica_dir_name, actual_replica_dir_name); + } +}; + +TEST_P(GetReplicaDirNameTest, GetReplicaDirName) { test_get_replica_dir_name(); } + +const std::vector get_replica_dir_name_tests{ + // Linux absolute path and non-empty dir name. + {"/data/pegasus/1.2.pegasus", "1.2.pegasus"}, + // Linux absolute path and empty dir name. + {"/data/pegasus/1.2.pegasus/", ""}, + // Windows absolute path and non-empty dir name. + {R"(D:\data\pegasus\1.2.pegasus)", "1.2.pegasus"}, + // Windows absolute path and empty dir name. + {R"(D:\data\pegasus\1.2.pegasus\)", ""}, + // Linux relative path and non-empty dir name. + {"./1.2.pegasus", "1.2.pegasus"}, + // Linux relative path and empty dir name. + {"./1.2.pegasus/", ""}, + // Windows relative path and non-empty dir name. + {R"(.\1.2.pegasus)", "1.2.pegasus"}, + // Windows relative path and empty dir name. + {R"(.\1.2.pegasus\)", ""}, +}; + +INSTANTIATE_TEST_SUITE_P(ReplicaDirTest, + GetReplicaDirNameTest, + testing::ValuesIn(get_replica_dir_name_tests)); + +struct parse_replica_dir_name_case +{ + std::string replica_dir_name; + bool ok; + gpid expected_pid; + std::string expected_app_type; +}; + +class ParseReplicaDirNameTest : public testing::TestWithParam +{ +public: + static void test_parse_replica_dir_name() + { + const auto &test_case = GetParam(); + + gpid actual_pid; + std::string actual_app_type; + ASSERT_EQ(test_case.ok, + replica_stub::parse_replica_dir_name( + test_case.replica_dir_name, actual_pid, actual_app_type)); + if (!test_case.ok) { + return; + } + + EXPECT_EQ(test_case.expected_pid, actual_pid); + EXPECT_EQ(test_case.expected_app_type, actual_app_type); + } +}; + +TEST_P(ParseReplicaDirNameTest, ParseReplicaDirName) { test_parse_replica_dir_name(); } + +const std::vector parse_replica_dir_name_tests{ + // Empty dir name. + {"", false, {}, ""}, + // Single-digit IDs. + {"1.2.pegasus", true, {1, 2}, "pegasus"}, + // Multi-digit IDs. + {"1234.56789.pegasus", true, {1234, 56789}, "pegasus"}, + // Custom app type other than "pegasus". + {"1.2.another", true, {1, 2}, "another"}, + // Custom app type with dot. + {"1.2.another.pegasus", true, {1, 2}, "another.pegasus"}, + // Custom app type with other specific symbol. + {"1.2.another_pegasus", true, {1, 2}, "another_pegasus"}, + // Missing one ID. + {"1.pegasus", false, {}, ""}, + // Missing both IDs. + {"pegasus", false, {}, ""}, + // ID with letter. + {"1.2a.pegasus", false, {}, ""}, + // ID with minus. + {"1.-2.pegasus", false, {}, ""}, +}; + +INSTANTIATE_TEST_SUITE_P(ReplicaDirTest, + ParseReplicaDirNameTest, + testing::ValuesIn(parse_replica_dir_name_tests)); + +} // namespace dsn::replication diff --git a/src/server/result_writer.cpp b/src/server/result_writer.cpp index df78172e9c..4d3a9160e0 100644 --- a/src/server/result_writer.cpp +++ b/src/server/result_writer.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include "pegasus/client.h" diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h index 2e2b34bb0d..549d8fd948 100644 --- a/src/test_util/test_util.h +++ b/src/test_util/test_util.h @@ -44,11 +44,12 @@ class file_meta; } // namespace replication } // namespace dsn +#define PRESERVE_VAR(name, expr) \ + const auto PRESERVED_##name = expr; \ + auto PRESERVED_##name##_cleanup = dsn::defer([PRESERVED_##name]() { expr = PRESERVED_##name; }) + // Save the current value of a flag and restore it at the end of the function. -#define PRESERVE_FLAG(name) \ - const auto PRESERVED_FLAGS_##name = FLAGS_##name; \ - auto PRESERVED_FLAGS_##name##_cleanup = \ - dsn::defer([PRESERVED_FLAGS_##name]() { FLAGS_##name = PRESERVED_FLAGS_##name; }) +#define PRESERVE_FLAG(name) PRESERVE_VAR(FLAGS_##name, FLAGS_##name) namespace pegasus { diff --git a/src/utils/autoref_ptr.h b/src/utils/autoref_ptr.h index 501698f852..c3ba9d9595 100644 --- a/src/utils/autoref_ptr.h +++ b/src/utils/autoref_ptr.h @@ -160,6 +160,15 @@ class ref_ptr T *operator->() const { return _obj; } + bool operator==(const ref_ptr &r) const { return _obj == r._obj; } + + template ::value>::type> + bool operator==(const ref_ptr &r) const + { + return _obj == r._obj; + } + bool operator==(T *r) const { return _obj == r; } bool operator!=(T *r) const { return _obj != r; } diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h index b971522d01..9f23d24bb7 100644 --- a/src/utils/command_manager.h +++ b/src/utils/command_manager.h @@ -36,6 +36,7 @@ #include #include #include +#include #include #include "utils/fmt_logging.h" @@ -65,14 +66,12 @@ class command_manager : public ::dsn::utils::singleton // 'validator' is used to validate the new value. // The value is reset to 'default_value' if passing "DEFAULT" argument. template - WARN_UNUSED_RESULT std::unique_ptr register_int_command( - T &value, - T default_value, - const std::string &command, - const std::string &help, - std::function validator = [](int64_t new_value) -> bool { - return new_value >= 0; - }) + WARN_UNUSED_RESULT std::unique_ptr + register_int_command(T &value, + T default_value, + const std::string &command, + const std::string &help, + std::function::type)> validator) { return register_single_command( command, @@ -83,6 +82,19 @@ class command_manager : public ::dsn::utils::singleton }); } + template + WARN_UNUSED_RESULT std::unique_ptr register_int_command( + T &value, T default_value, const std::string &command, const std::string &help) + { + return register_int_command(value, + default_value, + command, + help, + [](typename std::remove_reference::type new_value) -> bool { + return new_value >= 0; + }); + } + // Register a single 'command' with the 'help' description, its arguments are described in // 'args'. std::unique_ptr @@ -133,11 +145,12 @@ class command_manager : public ::dsn::utils::singleton set_bool(bool &value, const std::string &name, const std::vector &args); template - static std::string set_int(T &value, - T default_value, - const std::string &name, - const std::vector &args, - const std::function &validator) + static std::string + set_int(T &value, + T default_value, + const std::string &name, + const std::vector &args, + const std::function::type)> &validator) { nlohmann::json msg; msg["error"] = "ok"; @@ -164,8 +177,7 @@ class command_manager : public ::dsn::utils::singleton // Invalid argument. T new_value = 0; - if (!internal::buf2signed(args[0], new_value) || - !validator(static_cast(new_value))) { + if (!buf2numeric(args[0], new_value) || !validator(new_value)) { msg["error"] = fmt::format("ERR: invalid argument '{}', the value is not acceptable", args[0]); return msg.dump(2); diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h index d36aa98640..872ea4fb70 100644 --- a/src/utils/time_utils.h +++ b/src/utils/time_utils.h @@ -147,7 +147,7 @@ class chronograph inline void reset_start_time() { _start_time_ns = dsn_now_ns(); } - inline uint64_t duration_ns() const + [[nodiscard]] inline uint64_t duration_ns() const { auto now = dsn_now_ns(); CHECK_GE(now, _start_time_ns); @@ -155,6 +155,8 @@ class chronograph return now - _start_time_ns; } + [[nodiscard]] inline uint64_t duration_ms() const { return duration_ns() / 1'000'000; } + private: uint64_t _start_time_ns;