diff --git a/.clang-tidy b/.clang-tidy index 914dee1198..95dd7616d7 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -20,7 +20,7 @@ CheckOptions: [] # Disable some checks that are not useful for us now. # They are sorted by names, and should be consistent to build_tools/clang_tidy.py. -Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter' +Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-bind,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter,-readability-suspicious-call-argument' ExtraArgs: ExtraArgsBefore: [] FormatStyle: none diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py index 09ea434b1d..8a35e1a5dd 100755 --- a/build_tools/clang_tidy.py +++ b/build_tools/clang_tidy.py @@ -88,6 +88,7 @@ def tidy_on_path(path): "-misc-definitions-in-headers," "-misc-non-private-member-variables-in-classes," "-misc-unused-parameters," + "-modernize-avoid-bind," "-modernize-avoid-c-arrays," "-modernize-replace-disallow-copy-and-assign-macro," "-modernize-use-trailing-return-type," @@ -95,7 +96,8 @@ def tidy_on_path(path): "-readability-function-cognitive-complexity," "-readability-identifier-length," "-readability-magic-numbers," - "-readability-named-parameter", + "-readability-named-parameter," + "-readability-suspicious-call-argument", "-extra-arg=-language=c++", "-extra-arg=-std=c++17", "-extra-arg=-Ithirdparty/output/include"] diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h index a2c29ef458..5f0628026a 100644 --- a/src/common/replication.codes.h +++ b/src/common/replication.codes.h @@ -141,7 +141,6 @@ MAKE_EVENT_CODE(LPC_META_STATE_NORMAL, TASK_PRIORITY_COMMON) // THREAD_POOL_REPLICATION #define CURRENT_THREAD_POOL THREAD_POOL_REPLICATION -MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON) @@ -186,6 +185,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH) // THREAD_POOL_LOCAL_APP #define CURRENT_THREAD_POOL THREAD_POOL_LOCAL_APP +MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_WRITE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_read_THROTTLING_DELAY, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index bd9736304b..6ec19c007d 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -29,6 +29,7 @@ #include #include // for std::remove_if #include +#include #include #include #include diff --git a/src/replica/replica.h b/src/replica/replica.h index 4ee215c3dc..b312865c5e 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -610,6 +610,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_disk_test; friend class replica_disk_migrate_test; friend class open_replica_test; + friend class mock_load_replica; friend class replica_follower; friend class ::pegasus::server::pegasus_server_test_base; friend class ::pegasus::server::rocksdb_wrapper_test; diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 7a51ee4cac..aa8336fd26 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -29,15 +29,15 @@ // IWYU pragma: no_include #include #include +#include #include -#include -#include #include #include #include -#include +#include #include #include +#include #include #include #include @@ -71,6 +71,10 @@ #include "security/access_controller.h" #include "split/replica_split_manager.h" #include "task/async_calls.h" +#include "task/task.h" +#include "task/task_engine.h" +#include "task/task_worker.h" +#include "utils/api_utilities.h" #include "utils/command_manager.h" #include "utils/env.h" #include "utils/errors.h" @@ -80,8 +84,11 @@ #include "utils/ports.h" #include "utils/process_utils.h" #include "utils/rand.h" +#include "utils/string_conv.h" #include "utils/strings.h" #include "utils/synchronize.h" +#include "utils/threadpool_spec.h" +#include "utils/timer.h" #ifdef DSN_ENABLE_GPERF #include #elif defined(DSN_USE_JEMALLOC) @@ -91,14 +98,17 @@ #include "remote_cmd/remote_command.h" #include "utils/fail_point.h" -static const char *kMaxConcurrentBulkLoadDownloadingCountDesc = - "The maximum concurrent bulk load downloading replica count"; -DSN_DEFINE_int32(replication, - max_concurrent_bulk_load_downloading_count, - 5, - kMaxConcurrentBulkLoadDownloadingCountDesc); -DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count, - [](int32_t value) -> bool { return value >= 0; }); +namespace { + +const char *kMaxReplicasOnLoadForEachDiskDesc = + "The max number of replicas that are allowed to be loaded simultaneously for each disk dir."; + +const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica loading to complete."; + +const char *kMaxConcurrentBulkLoadDownloadingCountDesc = + "The maximum concurrent bulk load downloading replica count."; + +} // anonymous namespace METRIC_DEFINE_gauge_int64(server, total_replicas, @@ -260,6 +270,26 @@ DSN_DECLARE_string(data_dirs); DSN_DECLARE_string(encryption_cluster_key_name); DSN_DECLARE_string(server_key); +DSN_DEFINE_uint64(replication, + max_replicas_on_load_for_each_disk, + 256, + kMaxReplicasOnLoadForEachDiskDesc); +DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE); +DSN_DEFINE_validator(max_replicas_on_load_for_each_disk, + [](uint64_t value) -> bool { return value > 0; }); + +DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10, kLoadReplicaMaxWaitTimeMsDesc); +DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE); +DSN_DEFINE_validator(load_replica_max_wait_time_ms, + [](uint64_t value) -> bool { return value > 0; }); + +DSN_DEFINE_int32(replication, + max_concurrent_bulk_load_downloading_count, + 5, + kMaxConcurrentBulkLoadDownloadingCountDesc); +DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count, + [](int32_t value) -> bool { return value >= 0; }); + DSN_DEFINE_bool(replication, deny_client_on_start, false, @@ -379,9 +409,52 @@ namespace dsn { namespace replication { bool replica_stub::s_not_exit_on_log_failure = false; +namespace { + +// Register commands that get/set flag configurations. +void register_flags_ctrl_command() +{ + // For the reaonse why using std::call_once please see comments in + // replica_stub::register_ctrl_command() for details. + static std::once_flag flag; + std::call_once(flag, []() mutable { + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_int_command( + FLAGS_max_replicas_on_load_for_each_disk, + FLAGS_max_replicas_on_load_for_each_disk, + "replica.max-replicas-on-load-for-each-disk", + kMaxReplicasOnLoadForEachDiskDesc)); + + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_int_command( + FLAGS_load_replica_max_wait_time_ms, + FLAGS_load_replica_max_wait_time_ms, + "replica.load-replica-max-wait-time-ms", + kLoadReplicaMaxWaitTimeMsDesc)); + + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_bool_command( + FLAGS_empty_write_disabled, + "replica.disable-empty-write", + "whether to disable empty writes")); + + dsn::command_manager::instance().add_global_cmd( + dsn::command_manager::instance().register_int_command( + FLAGS_max_concurrent_bulk_load_downloading_count, + FLAGS_max_concurrent_bulk_load_downloading_count, + "replica.max-concurrent-bulk-load-downloading-count", + kMaxConcurrentBulkLoadDownloadingCountDesc)); + }); +} + +} // anonymous namespace + replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, bool is_long_subscriber /* = true*/) : serverlet("replica_stub"), + _state(NS_Disconnected), + _replica_state_subscriber(std::move(subscriber)), + _is_long_subscriber(is_long_subscriber), _deny_client(false), _verbose_client_log(false), _verbose_commit_log(false), @@ -391,6 +464,9 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _bulk_load_downloading_count(0), _manual_emergency_checkpointing_count(0), _is_running(false), +#ifdef DSN_ENABLE_GPERF + _is_releasing_memory(false), +#endif METRIC_VAR_INIT_server(total_replicas), METRIC_VAR_INIT_server(opening_replicas), METRIC_VAR_INIT_server(closing_replicas), @@ -423,16 +499,13 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms), METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes) { -#ifdef DSN_ENABLE_GPERF - _is_releasing_memory = false; -#endif - _replica_state_subscriber = subscriber; - _is_long_subscriber = is_long_subscriber; - _failure_detector = nullptr; - _state = NS_Disconnected; + // Some flags might need to be tuned on the stage of loading replicas (during + // replica_stub::initialize()), thus register their control command just in the + // constructor. + register_flags_ctrl_command(); } -replica_stub::~replica_stub(void) { close(); } +replica_stub::~replica_stub() { close(); } void replica_stub::initialize(bool clear /* = false*/) { @@ -442,6 +515,246 @@ void replica_stub::initialize(bool clear /* = false*/) _access_controller = std::make_unique(); } +std::vector replica_stub::get_all_disk_dirs() const +{ + std::vector disks; + for (const auto &disk_node : _fs_manager.get_dir_nodes()) { + if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) { + // Skip disks with IO errors. + continue; + } + + std::vector sub_dirs; + CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir, sub_dirs, false), + "failed to get sub_directories in {}", + disk_node->full_dir); + disks.push_back(disk_replicas_info{disk_node.get(), std::move(sub_dirs)}); + } + + return disks; +} + +// TaskCode: LPC_REPLICATION_INIT_LOAD +// ThreadPool: THREAD_POOL_LOCAL_APP +void replica_stub::load_replica(dir_node *disk_node, + const std::string &replica_dir, + size_t total_dir_count, + utils::ex_lock &reps_lock, + replica_map_by_gpid &reps, + std::atomic &finished_dir_count) +{ + // Measure execution time for loading a replica dir. + // + // TODO(wangdan): support decimal milliseconds or microseconds, since loading a small + // replica tends to spend less than 1 milliseconds and show "0ms" in logging. + SCOPED_LOG_TIMING(INFO, "on loading replica dir {}:{}", disk_node->tag, replica_dir); + + LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, replica_dir); + + const auto *const worker = task::get_current_worker2(); + if (worker != nullptr) { + CHECK(!(worker->pool()->spec().partitioned), + "The thread pool THREAD_POOL_LOCAL_APP(task code: LPC_REPLICATION_INIT_LOAD) " + "for loading replicas must not be partitioned since load balancing is required " + "among multiple threads"); + } + + auto rep = load_replica(disk_node, replica_dir); + if (rep == nullptr) { + LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}", + disk_node->tag, + replica_dir, + ++finished_dir_count, + total_dir_count); + return; + } + + LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{}, progress={}/{}, " + "last_durable_decree={}, last_committed_decree={}, last_prepared_decree={}", + rep->get_gpid(), + dsn_primary_host_port(), + disk_node->tag, + replica_dir, + ++finished_dir_count, + total_dir_count, + rep->last_durable_decree(), + rep->last_committed_decree(), + rep->last_prepared_decree()); + + utils::auto_lock l(reps_lock); + const auto rep_iter = reps.find(rep->get_gpid()); + CHECK(rep_iter == reps.end(), + "{}@{}: newly loaded dir {} conflicts with existing {} while loading replica", + rep->get_gpid(), + dsn_primary_host_port(), + rep->dir(), + rep_iter->second->dir()); + + reps.emplace(rep->get_gpid(), rep); +} + +void replica_stub::load_replicas(replica_map_by_gpid &reps) +{ + // Measure execution time for loading all replicas from all healthy disks without IO errors. + // + // TODO(wangdan): show both the size of output replicas and execution time on just one + // logging line. + SCOPED_LOG_TIMING(INFO, "on loading replicas"); + + const auto &disks = get_all_disk_dirs(); + + // The max index of dirs that are currently being loaded for each disk, which means the dirs + // with higher indexes have not begun to be loaded (namely pushed into the queue). + std::vector replica_dir_indexes(disks.size(), 0); + + // Each loader is for a replica dir, including its path and loading task. + struct replica_dir_loader + { + size_t replica_dir_index; + std::string replica_dir_path; + task_ptr load_replica_task; + }; + + // Each queue would cache the tasks that loading dirs for each disk. Once the task is + // found finished (namely a dir has been loaded successfully), it would be popped from + // the queue. + std::vector> load_disk_queues(disks.size()); + + // The number of loading replica dirs that have been finished for each disk, used to show + // current progress. + // + // TODO(wangdan): calculate the number of successful or failed loading of replica dirs, + // and the number for each reason if failed. + std::vector> finished_replica_dirs(disks.size()); + for (auto &count : finished_replica_dirs) { + count.store(0); + } + + // The lock for operations on the loaded replicas as output. + utils::ex_lock reps_lock; + + while (true) { + size_t finished_disks = 0; + + // For each round, start loading one replica for each disk in case there are too many + // replicas in a disk, except that all of the replicas of this disk are being loaded. + for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) { + // TODO(wangdan): Structured bindings can be captured by closures in g++, while + // not supported well by clang. Thus we do not use following statement to bind + // both variables until clang has been upgraded to version 16 which could support + // that well: + // + // const auto &[disk_node, replica_dirs] = disks[disk_index]; + // + // For the docs of clang 16 please see: + // + // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support. + const auto &replica_dirs = disks[disk_index].replica_dirs; + + auto &replica_dir_index = replica_dir_indexes[disk_index]; + if (replica_dir_index >= replica_dirs.size()) { + // All of the replicas for the disk `disks[disk_index]` have begun to be loaded, + // thus just skip to next disk. + ++finished_disks; + continue; + } + + const auto &disk_node = disks[disk_index].disk_node; + auto &load_disk_queue = load_disk_queues[disk_index]; + if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { + // Loading replicas should be throttled in case that disk IO is saturated. + if (!load_disk_queue.front().load_replica_task->wait( + static_cast(FLAGS_load_replica_max_wait_time_ms))) { + // There might be too many replicas that are being loaded which lead to + // slow disk IO, thus turn to load replicas of next disk, and try to load + // dir `replica_dir_index` of this disk in the next round. + LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still not finished, " + "there are {} replicas being loaded for disk({}:{}, {}/{}), " + "now turn to next disk, and will begin to load dir({}, {}/{}) " + "soon", + FLAGS_load_replica_max_wait_time_ms, + load_disk_queue.front().replica_dir_path, + load_disk_queue.front().replica_dir_index, + replica_dirs.size(), + load_disk_queue.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size(), + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size()); + continue; + } + + // Now the queue size is within the limit again, continue to load a new replica dir. + load_disk_queue.pop(); + } + + if (dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) { + LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})", + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size()); + ++replica_dir_index; + continue; + } + + LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})", + replica_dirs[replica_dir_index], + replica_dir_index, + replica_dirs.size(), + disk_node->tag, + disk_node->full_dir, + disk_index, + disks.size()); + + load_disk_queue.push(replica_dir_loader{ + replica_dir_index, + replica_dirs[replica_dir_index], + tasking::create_task( + // Ensure that the thread pool is non-partitioned. + LPC_REPLICATION_INIT_LOAD, + &_tracker, + std::bind(static_cast &)>( + &replica_stub::load_replica), + this, + disk_node, + replica_dirs[replica_dir_index], + replica_dirs.size(), + std::ref(reps_lock), + std::ref(reps), + std::ref(finished_replica_dirs[disk_index])))}); + + load_disk_queue.back().load_replica_task->enqueue(); + + ++replica_dir_index; + } + + if (finished_disks >= disks.size()) { + // All replicas of all disks have begun to be loaded. + break; + } + } + + // All loading tasks have been in the queue. Just wait all tasks to be finished. + for (auto &load_disk_queue : load_disk_queues) { + while (!load_disk_queue.empty()) { + CHECK_TRUE(load_disk_queue.front().load_replica_task->wait()); + load_disk_queue.pop(); + } + } +} + void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/) { _primary_host_port = dsn_primary_host_port(); @@ -526,75 +839,14 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f // Start to load replicas in available data directories. LOG_INFO("start to load replicas"); - std::map> dirs_by_dn; - for (const auto &dn : _fs_manager.get_dir_nodes()) { - // Skip IO error dir_node. - if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) { - continue; - } - std::vector sub_directories; - CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false), - "fail to get sub_directories in {}", - dn->full_dir); - dirs_by_dn.emplace(dn.get(), sub_directories); - } - - replicas rps; - utils::ex_lock rps_lock; - std::deque load_tasks; - uint64_t start_time = dsn_now_ms(); - for (const auto &dn_dirs : dirs_by_dn) { - const auto dn = dn_dirs.first; - for (const auto &dir : dn_dirs.second) { - if (dsn::replication::is_data_dir_invalid(dir)) { - LOG_WARNING("ignore dir {}", dir); - continue; - } - load_tasks.push_back(tasking::create_task( - LPC_REPLICATION_INIT_LOAD, - &_tracker, - [this, dn, dir, &rps, &rps_lock] { - LOG_INFO("process dir {}", dir); - - auto r = load_replica(dn, dir.c_str()); - if (r == nullptr) { - return; - } - LOG_INFO("{}@{}: load replica '{}' success, = <{}, {}>, last_prepared_decree = {}", - r->get_gpid(), - dsn_primary_host_port(), - dir, - r->last_durable_decree(), - r->last_committed_decree(), - r->last_prepared_decree()); - - utils::auto_lock l(rps_lock); - CHECK(rps.find(r->get_gpid()) == rps.end(), - "conflict replica dir: {} <--> {}", - r->dir(), - rps[r->get_gpid()]->dir()); - - rps[r->get_gpid()] = r; - }, - load_tasks.size())); - load_tasks.back()->enqueue(); - } - } - for (auto &tsk : load_tasks) { - tsk->wait(); - } - uint64_t finish_time = dsn_now_ms(); + replica_map_by_gpid reps; + load_replicas(reps); - dirs_by_dn.clear(); - load_tasks.clear(); - LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms", - rps.size(), - finish_time - start_time); + LOG_INFO("load replicas succeed, replica_count = {}", reps.size()); bool is_log_complete = true; - for (auto it = rps.begin(); it != rps.end(); ++it) { + for (auto it = reps.begin(); it != reps.end(); ++it) { CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync checkpoint failed"); it->second->reset_prepare_list_after_replay(); @@ -624,8 +876,8 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f if (!is_log_complete) { LOG_ERROR("logs are not complete for some replicas, which means that shared log is " "truncated, mark all replicas as inactive"); - for (auto it = rps.begin(); it != rps.end(); ++it) { - it->second->set_inactive_state_transient(false); + for (auto &[_, rep] : reps) { + rep->set_inactive_state_transient(false); } } @@ -651,11 +903,11 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f std::chrono::seconds(FLAGS_disk_stat_interval_seconds)); } - // attach rps - _replicas = std::move(rps); + // Attach `reps`. + _replicas = std::move(reps); METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size()); - for (const auto &kv : _replicas) { - _fs_manager.add_replica(kv.first, kv.second->dir()); + for (const auto &[pid, rep] : _replicas) { + _fs_manager.add_replica(pid, rep->dir()); } _nfs = dsn::nfs_node::create(); @@ -749,7 +1001,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id) { LOG_INFO("kill replica: gpid = {}", id); if (id.get_app_id() == -1 || id.get_partition_index() == -1) { - replicas rs; + replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; @@ -1376,27 +1628,28 @@ void replica_stub::on_node_query_reply(error_code err, resp.partitions.size(), resp.gc_replicas.size()); - replicas rs; + replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); - rs = _replicas; + reps = _replicas; } - for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) { - rs.erase(it->config.pid); - tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER, - &_tracker, - std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it), - it->config.pid.thread_hash()); + for (const auto &config_update : resp.partitions) { + reps.erase(config_update.config.pid); + tasking::enqueue( + LPC_QUERY_NODE_CONFIGURATION_SCATTER, + &_tracker, + std::bind(&replica_stub::on_node_query_reply_scatter, this, this, config_update), + config_update.config.pid.thread_hash()); } - // for rps not exist on meta_servers - for (auto it = rs.begin(); it != rs.end(); ++it) { + // For the replicas that do not exist on meta_servers. + for (const auto &[pid, _] : reps) { tasking::enqueue( LPC_QUERY_NODE_CONFIGURATION_SCATTER2, &_tracker, - std::bind(&replica_stub::on_node_query_reply_scatter2, this, this, it->first), - it->first.thread_hash()); + std::bind(&replica_stub::on_node_query_reply_scatter2, this, this, pid), + pid.thread_hash()); } // handle the replicas which need to be gc @@ -1525,18 +1778,18 @@ void replica_stub::on_meta_server_disconnected() _state = NS_Disconnected; - replicas rs; + replica_map_by_gpid reps; { zauto_read_lock rl(_replicas_lock); - rs = _replicas; + reps = _replicas; } - for (auto it = rs.begin(); it != rs.end(); ++it) { + for (const auto &[pid, _] : reps) { tasking::enqueue( LPC_CM_DISCONNECTED_SCATTER, &_tracker, - std::bind(&replica_stub::on_meta_server_disconnected_scatter, this, this, it->first), - it->first.thread_hash()); + std::bind(&replica_stub::on_meta_server_disconnected_scatter, this, this, pid), + pid.thread_hash()); } } @@ -1826,7 +2079,7 @@ void replica_stub::open_replica( _primary_host_port_cache, group_check ? "with" : "without", dir); - rep = load_replica(dn, dir.c_str()); + rep = load_replica(dn, dir); // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk // migration @@ -1851,7 +2104,7 @@ void replica_stub::open_replica( boost::replace_first( origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, ""); dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir); - rep = load_replica(origin_dn, origin_dir.c_str()); + rep = load_replica(origin_dn, origin_dir); FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> void {}); } @@ -1918,7 +2171,7 @@ void replica_stub::open_replica( METRIC_VAR_DECREMENT(opening_replicas); CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in _replicas", id); - _replicas.insert(replicas::value_type(rep->get_gpid(), rep)); + _replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(), rep)); METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); @@ -1984,6 +2237,51 @@ replica *replica_stub::new_replica(gpid gpid, return rep; } +replica *replica_stub::new_replica(gpid gpid, + const app_info &app, + bool restore_if_necessary, + bool is_duplication_follower) +{ + return new_replica(gpid, app, restore_if_necessary, is_duplication_follower, {}); +} + +/*static*/ std::string replica_stub::get_replica_dir_name(const std::string &dir) +{ + static const char splitters[] = {'\\', '/', 0}; + return utils::get_last_component(dir, splitters); +} + +/* static */ bool +replica_stub::parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type) +{ + std::vector ids(2, 0); + size_t begin = 0; + for (auto &id : ids) { + size_t end = dir_name.find('.', begin); + if (end == std::string::npos) { + return false; + } + + if (!buf2uint32(std::string_view(dir_name.data() + begin, end - begin), id)) { + return false; + } + + begin = end + 1; + } + + if (begin >= dir_name.size()) { + return false; + } + + pid.set_app_id(static_cast(ids[0])); + pid.set_partition_index(static_cast(ids[1])); + + // TODO(wangdan): the 3rd parameter `count` does not support default argument for CentOS 7 + // (gcc 7.3.1). After CentOS 7 is deprecated, consider dropping std::string::npos. + app_type.assign(dir_name, begin, std::string::npos); + return true; +} + bool replica_stub::validate_replica_dir(const std::string &dir, app_info &ai, gpid &pid, @@ -1994,21 +2292,18 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return false; } - char splitters[] = {'\\', '/', 0}; - const auto name = utils::get_last_component(dir, splitters); - if (name.empty()) { + const auto &dir_name = get_replica_dir_name(dir); + if (dir_name.empty()) { hint_message = fmt::format("invalid replica dir '{}'", dir); return false; } - char app_type[128] = {0}; - int32_t app_id, pidx; - if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { + std::string app_type; + if (!parse_replica_dir_name(dir_name, pid, app_type)) { hint_message = fmt::format("invalid replica dir '{}'", dir); return false; } - pid = gpid(app_id, pidx); replica_app_info rai(&ai); const auto ai_path = utils::filesystem::path_combine(dir, replica_app_info::kAppInfo); const auto err = rai.load(ai_path); @@ -2022,10 +2317,9 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return false; } - // When the online partition split function aborted, the garbage partitions are with pidx in - // the range of [ai.partition_count, 2 * ai.partition_count), which means the partitions with - // pidx >= ai.partition_count are garbage partitions. - if (ai.partition_count <= pidx) { + if (pid.get_partition_index() >= ai.partition_count) { + // Once the online partition split aborted, the partitions within the range of + // [ai.partition_count, 2 * ai.partition_count) would become garbage. hint_message = fmt::format( "partition[{}], count={}, this replica may be partition split garbage partition, " "ignore it", @@ -2037,7 +2331,7 @@ bool replica_stub::validate_replica_dir(const std::string &dir, return true; } -replica *replica_stub::load_replica(dir_node *dn, const char *dir) +replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string &replica_dir) { FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) -> replica * { return nullptr; }); @@ -2045,24 +2339,28 @@ replica *replica_stub::load_replica(dir_node *dn, const char *dir) app_info ai; gpid pid; std::string hint_message; - if (!validate_replica_dir(dir, ai, pid, hint_message)) { - LOG_ERROR("invalid replica dir '{}', hint: {}", dir, hint_message); + if (!validate_replica_dir(replica_dir, ai, pid, hint_message)) { + LOG_ERROR("invalid replica dir '{}', hint={}", replica_dir, hint_message); return nullptr; } // The replica's directory must exist when creating a replica. - CHECK_EQ(dir, dn->replica_dir(ai.app_type, pid)); - auto *rep = new replica(this, pid, ai, dn, false); + CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir); + + auto *rep = new replica(this, pid, ai, disk_node, false); const auto err = rep->initialize_on_load(); if (err != ERR_OK) { - LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err); - rep->close(); + LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}", + rep->name(), + disk_node->tag, + replica_dir, + err); delete rep; rep = nullptr; // clear work on failure - if (dsn::utils::filesystem::directory_exists(dir)) { - move_to_err_path(dir, "load replica"); + if (dsn::utils::filesystem::directory_exists(replica_dir)) { + move_to_err_path(replica_dir, "load replica"); METRIC_VAR_INCREMENT(moved_error_replicas); _fs_manager.remove_replica(pid); } @@ -2070,7 +2368,10 @@ replica *replica_stub::load_replica(dir_node *dn, const char *dir) return nullptr; } - LOG_INFO("{}: load replica succeed", rep->name()); + LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}", + rep->name(), + disk_node->tag, + replica_dir); return rep; } @@ -2397,11 +2698,6 @@ void replica_stub::register_ctrl_command() }); })); - _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( - FLAGS_empty_write_disabled, - "replica.disable-empty-write", - "whether to disable empty writes")); - #ifdef DSN_ENABLE_GPERF _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _release_tcmalloc_memory, @@ -2436,61 +2732,64 @@ void replica_stub::register_ctrl_command() #elif defined(DSN_USE_JEMALLOC) register_jemalloc_ctrl_command(); #endif - _cmds.emplace_back(::dsn::command_manager::instance().register_int_command( - FLAGS_max_concurrent_bulk_load_downloading_count, - FLAGS_max_concurrent_bulk_load_downloading_count, - "replica.max-concurrent-bulk-load-downloading-count", - kMaxConcurrentBulkLoadDownloadingCountDesc)); }); } std::string -replica_stub::exec_command_on_replica(const std::vector &args, +replica_stub::exec_command_on_replica(const std::vector &arg_str_list, bool allow_empty_args, - std::function func) + std::function func) { - if (args.empty() && !allow_empty_args) { - return std::string("invalid arguments"); + static const std::string kInvalidArguments("invalid arguments"); + + if (arg_str_list.empty() && !allow_empty_args) { + return kInvalidArguments; } - replicas rs; + replica_map_by_gpid rs; { zauto_read_lock l(_replicas_lock); rs = _replicas; } std::set required_ids; - replicas choosed_rs; - if (!args.empty()) { - for (int i = 0; i < args.size(); i++) { - std::vector arg_strs; - utils::split_args(args[i].c_str(), arg_strs, ','); - if (arg_strs.empty()) { - return std::string("invalid arguments"); + replica_map_by_gpid choosed_rs; + if (!arg_str_list.empty()) { + for (const auto &arg_str : arg_str_list) { + std::vector args; + utils::split_args(arg_str.c_str(), args, ','); + if (args.empty()) { + return kInvalidArguments; } - for (const std::string &arg : arg_strs) { - if (arg.empty()) + for (const std::string &arg : args) { + if (arg.empty()) { continue; + } + gpid id; - int pid; if (id.parse_from(arg.c_str())) { - // app_id.partition_index + // Format: app_id.partition_index required_ids.insert(id); auto find = rs.find(id); if (find != rs.end()) { choosed_rs[id] = find->second; } - } else if (sscanf(arg.c_str(), "%d", &pid) == 1) { - // app_id - for (auto kv : rs) { - id = kv.second->get_gpid(); - if (id.get_app_id() == pid) { - choosed_rs[id] = kv.second; - } + + continue; + } + + // Must be app_id. + int32_t app_id = 0; + if (!buf2int32(arg, app_id)) { + return kInvalidArguments; + } + + for (const auto &[_, rep] : rs) { + id = rep->get_gpid(); + if (id.get_app_id() == app_id) { + choosed_rs[id] = rep; } - } else { - return std::string("invalid arguments"); } } } @@ -2510,8 +2809,10 @@ replica_stub::exec_command_on_replica(const std::vector &args, [rep, &func, &results_lock, &results]() { partition_status::type status = rep->status(); if (status != partition_status::PS_PRIMARY && - status != partition_status::PS_SECONDARY) + status != partition_status::PS_SECONDARY) { return; + } + std::string result = func(rep); ::dsn::zauto_lock l(results_lock); auto &value = results[rep->get_gpid()]; @@ -2723,33 +3024,39 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, CHECK_NOTNULL(dn, ""); auto *rep = new replica(this, child_pid, *app, dn, false); rep->_config.status = partition_status::PS_INACTIVE; - _replicas.insert(replicas::value_type(child_pid, rep)); + _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); LOG_INFO("mock create_child_replica_if_not_found succeed"); return rep; }); zauto_write_lock l(_replicas_lock); - auto it = _replicas.find(child_pid); + + const auto it = _replicas.find(child_pid); if (it != _replicas.end()) { return it->second; - } else { - if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { - LOG_WARNING("failed create child replica({}) because it is under open", child_pid); - return nullptr; - } else if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { - LOG_WARNING("failed create child replica({}) because it is under close", child_pid); - return nullptr; - } else { - replica *rep = new_replica(child_pid, *app, false, false, parent_dir); - if (rep != nullptr) { - auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); - CHECK(pr.second, "child replica {} has been existed", rep->name()); - METRIC_VAR_INCREMENT(total_replicas); - _closed_replicas.erase(child_pid); - } - return rep; - } } + + if (_opening_replicas.find(child_pid) != _opening_replicas.end()) { + LOG_WARNING("failed create child replica({}) because it is under open", child_pid); + return nullptr; + } + + if (_closing_replicas.find(child_pid) != _closing_replicas.end()) { + LOG_WARNING("failed create child replica({}) because it is under close", child_pid); + return nullptr; + } + + replica *rep = new_replica(child_pid, *app, false, false, parent_dir); + if (rep == nullptr) { + return nullptr; + } + + const auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep)); + CHECK(pr.second, "child replica {} has been existed", rep->name()); + METRIC_VAR_INCREMENT(total_replicas); + _closed_replicas.erase(child_pid); + + return rep; } // ThreadPool: THREAD_POOL_REPLICATION diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 328e975024..d6162bf8c3 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -27,8 +27,9 @@ #pragma once #include -#include #include +#include +#include #include #include #include @@ -70,6 +71,12 @@ #include "utils/metrics.h" #include "utils/zlocks.h" +namespace dsn::utils { + +class ex_lock; + +} // namespace dsn::utils + DSN_DECLARE_uint32(max_concurrent_manual_emergency_checkpointing_count); namespace dsn { @@ -112,14 +119,14 @@ typedef rpc_holder add_new_disk_rpc namespace test { class test_checker; -} +} // namespace test + class cold_backup_context; class replica_split_manager; -typedef std::function - replica_state_subscriber; -typedef std::unordered_map replicas; + +using replica_state_subscriber = std::function; class replica_stub; @@ -223,9 +230,9 @@ class replica_stub : public serverlet, public ref_counter // - if allow_empty_args = false, you should specify at least one argument. // each argument should be in format of: // id1,id2... (where id is 'app_id' or 'app_id.partition_id') - std::string exec_command_on_replica(const std::vector &args, + std::string exec_command_on_replica(const std::vector &arg_str_list, bool allow_empty_args, - std::function func); + std::function func); // // partition split @@ -352,15 +359,61 @@ class replica_stub : public serverlet, public ref_counter gpid id, const std::shared_ptr &req, const std::shared_ptr &req2); - // Create a new replica according to the parameters. - // 'parent_dir' is used in partition split for create_child_replica_dir(). + + // Create a child replica for partition split, with 'parent_dir' specified as the parent + // replica dir used for `create_child_replica_dir()`. replica *new_replica(gpid gpid, const app_info &app, bool restore_if_necessary, bool is_duplication_follower, - const std::string &parent_dir = ""); - // Load an existing replica which is located in 'dn' with 'dir' directory. - replica *load_replica(dir_node *dn, const char *dir); + const std::string &parent_dir); + + // Create a new replica, choosing and assigning the best dir for it. + replica *new_replica(gpid gpid, + const app_info &app, + bool restore_if_necessary, + bool is_duplication_follower); + + // Each disk with its candidate replica dirs, used to load replicas while initializing. + struct disk_replicas_info + { + // `dir_node` for each disk. + dir_node *disk_node; + + // All replica dirs on each disk. + std::vector replica_dirs; + }; + + // Get the absolute dirs of all replicas for all healthy disks without IO errors. + std::vector get_all_disk_dirs() const; + + // Get the replica dir name from a potentially longer path (`dir` could be an absolute + // or relative path). + static std::string get_replica_dir_name(const std::string &dir); + + // Parse app id, partition id and app type from the replica dir name. + static bool + parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type); + + // Load an existing replica which is located in `dn` with `replica_dir`. Usually each + // different `dn` represents a unique disk. `replica_dir` is the absolute path of the + // directory for a replica. + virtual replica_ptr load_replica(dir_node *disk_node, const std::string &replica_dir); + + using replica_map_by_gpid = std::unordered_map; + + // The same as the above `load_replica` function, except that this function is to load + // each replica to `reps` with protection from `reps_lock`. + void load_replica(dir_node *disk_node, + const std::string &replica_dir, + size_t total_dir_count, + utils::ex_lock &reps_lock, + replica_map_by_gpid &reps, + std::atomic &finished_dir_count); + + // Load all replicas simultaneously from all disks to `reps`. + void load_replicas(replica_map_by_gpid &reps); + // Clean up the memory state and on disk data if creating replica failed. void clear_on_failure(replica *rep); task_ptr begin_close_replica(replica_ptr r); @@ -444,21 +497,26 @@ class replica_stub : public serverlet, public ref_counter friend class replica_follower; friend class replica_follower_test; friend class replica_http_service_test; + friend class mock_load_replica; + friend class GetReplicaDirNameTest; + friend class ParseReplicaDirNameTest; FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check); FRIEND_TEST(replica_test, test_auto_trash_of_corruption); FRIEND_TEST(replica_test, test_clear_on_failure); - typedef std::unordered_map opening_replicas; - typedef std::unordered_map> - closing_replicas; // > - typedef std::map> - closed_replicas; // > + using opening_replica_map_by_gpid = std::unordered_map; + + // `task_ptr` is the task closing a replica. + using closing_replica_map_by_gpid = + std::unordered_map>; + + using closed_replica_map_by_gpid = std::map>; mutable zrwlock_nr _replicas_lock; - replicas _replicas; - opening_replicas _opening_replicas; - closing_replicas _closing_replicas; - closed_replicas _closed_replicas; + replica_map_by_gpid _replicas; + opening_replica_map_by_gpid _opening_replicas; + closing_replica_map_by_gpid _closing_replicas; + closed_replica_map_by_gpid _closed_replicas; ::dsn::host_port _primary_host_port; // The stringify of '_primary_host_port', used by logging usually. @@ -564,5 +622,6 @@ class replica_stub : public serverlet, public ref_counter dsn::task_tracker _tracker; }; + } // namespace replication } // namespace dsn diff --git a/src/replica/test/config-test.ini b/src/replica/test/config-test.ini index 2d2c6f3974..b2e0e86c2f 100644 --- a/src/replica/test/config-test.ini +++ b/src/replica/test/config-test.ini @@ -34,7 +34,7 @@ type = replica run = true count = 1 ports = 54321 -pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE +pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_LOCAL_APP,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE [core] ;tool = simulator @@ -76,6 +76,12 @@ partitioned = true worker_priority = THREAD_xPRIORITY_NORMAL worker_count = 3 +[threadpool.THREAD_POOL_LOCAL_APP] +name = local_app +partitioned = false +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 4 + [threadpool.THREAD_POOL_REPLICATION_LONG] name = replica_long diff --git a/src/replica/test/load_replicas_test.cpp b/src/replica/test/load_replicas_test.cpp new file mode 100644 index 0000000000..3315629c93 --- /dev/null +++ b/src/replica/test/load_replicas_test.cpp @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/fs_manager.h" +#include "common/gpid.h" +#include "common/replication.codes.h" +#include "dsn.layer2_types.h" +#include "gtest/gtest.h" +#include "replica/replica.h" +#include "replica/replica_stub.h" +#include "replica/replication_app_base.h" +#include "replica/test/mock_utils.h" +#include "task/task.h" +#include "task/task_code.h" +#include "task/task_spec.h" +#include "test_util/test_util.h" +#include "utils/autoref_ptr.h" +#include "utils/filesystem.h" +#include "utils/flags.h" +#include "utils/ports.h" + +DSN_DECLARE_uint64(max_replicas_on_load_for_each_disk); + +namespace dsn::replication { + +class mock_load_replica : public replica_stub +{ +public: + mock_load_replica() = default; + + ~mock_load_replica() override = default; + + void initialize(const std::map &dirs_by_tag, + const std::map> &replicas_by_tag) + { + // Get dirs and tags to initialize fs_manager. + std::vector dirs; + std::vector tags; + for (const auto &[tag, dir] : dirs_by_tag) { + dirs.push_back(dir); + tags.push_back(tag); + } + + // Initialize fs_manager by the tag and dir of each disk. + _fs_manager.initialize(dirs, tags); + + // Generate the replicas which are expected after loading. + for (const auto &[tag, reps] : replicas_by_tag) { + for (const auto &pid : reps) { + ASSERT_TRUE(_expected_loaded_replica_pids.insert(pid).second); + } + } + + _disk_tags_for_order.clear(); + _disk_dirs_for_order.clear(); + _disk_replicas_for_order.clear(); + _disk_loaded_replicas_for_order.assign(replicas_by_tag.size(), 0); + + // Ensure that the disks are scanned in the order returned by `get_dir_node()`. + for (const auto &dn : _fs_manager.get_dir_nodes()) { + for (const auto &pid : replicas_by_tag.at(dn->tag)) { + _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), "pegasus", pid); + } + + _disk_tags_for_order.push_back(dn->tag); + _disk_dirs_for_order.push_back(dn->full_dir); + _disk_replicas_for_order.push_back(replicas_by_tag.at(dn->tag).size()); + } + + ASSERT_EQ(_disk_tags_for_order.size(), _disk_dirs_for_order.size()); + } + + void test_load_replicas(bool test_load_order, uint64_t max_replicas_on_load_for_each_disk) + { + PRESERVE_VAR(allow_inline, dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline); + dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline = test_load_order; + + PRESERVE_FLAG(max_replicas_on_load_for_each_disk); + FLAGS_max_replicas_on_load_for_each_disk = max_replicas_on_load_for_each_disk; + + // Check if all loaded replicas are matched. + replica_stub::replica_map_by_gpid actual_loaded_replicas; + load_replicas(actual_loaded_replicas); + ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas); + + // Check if all replicas have been loaded. + std::set actual_loaded_replica_pids; + for (const auto &[pid, _] : actual_loaded_replicas) { + ASSERT_TRUE(actual_loaded_replica_pids.insert(pid).second); + } + ASSERT_EQ(_expected_loaded_replica_pids, actual_loaded_replica_pids); + } + + void remove_disk_dirs() + { + for (const auto &dn : _fs_manager.get_dir_nodes()) { + ASSERT_TRUE(utils::filesystem::remove_path(dn->full_dir)); + } + } + +private: + void load_replica_for_test(dir_node *dn, const std::string &replica_dir, replica_ptr &rep) + { + ASSERT_TRUE(utils::filesystem::directory_exists(replica_dir)); + + const auto &dir_name = get_replica_dir_name(replica_dir); + + gpid pid; + std::string app_type; + ASSERT_TRUE(parse_replica_dir_name(dir_name, pid, app_type)); + ASSERT_STREQ("pegasus", app_type.c_str()); + + ASSERT_EQ(LPC_REPLICATION_INIT_LOAD, task::get_current_task()->spec().code); + + if (task::get_current_task()->spec().allow_inline) { + // Once the task is `allow_inline`, it would be executed in place immediately rather + // than pushed into the queue. Thus we could test the expected order in which the + // tasks are pushed into the queue. + + // Find the first disk where there is still some replica that has not been loaded. + size_t finished_disks = 0; + while (_disk_loaded_replicas_for_order[_disk_index_for_order] >= + _disk_replicas_for_order[_disk_index_for_order]) { + // Since current task has not been executed, it is not possible that all disks + // are finished. + ++finished_disks; + ASSERT_GT(_disk_tags_for_order.size(), finished_disks); + + // Skip to next disk since all of the replicas of this disk have been loaded. + _disk_index_for_order = (_disk_index_for_order + 1) % _disk_tags_for_order.size(); + } + + // Only check if the processed order of the disk the replica belongs to, rather than + // the order of the replica itself, for the reason that the order of the dirs returned + // by the underlying call might vary. + ASSERT_EQ(_disk_tags_for_order[_disk_index_for_order], dn->tag); + ASSERT_EQ(_disk_dirs_for_order[_disk_index_for_order], dn->full_dir); + + // Current replica has been loaded, move forward to the next replica of this disk. + ++_disk_loaded_replicas_for_order[_disk_index_for_order]; + + // Turn to next disks if some of them still have some replicas that are not loaded. + _disk_index_for_order = (_disk_index_for_order + 1) % _disk_tags_for_order.size(); + } + + // Check the absolute dir of this replica. + ASSERT_EQ(dn->replica_dir("pegasus", pid), replica_dir); + + app_info ai; + ai.app_type = "pegasus"; + rep = new replica(this, pid, ai, dn, false); + rep->_app = std::make_unique(rep); + + std::lock_guard guard(_mtx); + + ASSERT_TRUE(_expected_loaded_replicas.find(pid) == _expected_loaded_replicas.end()); + + _expected_loaded_replicas[pid] = rep; + } + + // Mock the process of loading a replica. + replica_ptr load_replica(dir_node *dn, const std::string &replica_dir) override + { + replica_ptr rep; + load_replica_for_test(dn, replica_dir, rep); + return rep; + } + + std::set _expected_loaded_replica_pids; + + // The variables with postfix `_for_order` are only for testing the order of the loading + // tasks. + size_t _disk_index_for_order{0}; + std::vector _disk_tags_for_order; + std::vector _disk_dirs_for_order; + std::vector _disk_replicas_for_order; + std::vector _disk_loaded_replicas_for_order; + + mutable std::mutex _mtx; + replica_stub::replica_map_by_gpid _expected_loaded_replicas; + + DISALLOW_COPY_AND_ASSIGN(mock_load_replica); + DISALLOW_MOVE_AND_ASSIGN(mock_load_replica); +}; + +struct load_replicas_case +{ + // Each disk tag => dir of this disk. + std::map dirs_by_tag; + + // Each disk tag => replicas (specified by ) on this disk. + std::map> replicas_by_tag; +}; + +class LoadReplicasTest : public testing::TestWithParam +{ +public: + LoadReplicasTest() + { + // Remove all dirs of all disks to prevent each test from being disturbed. + _stub.remove_disk_dirs(); + + // Use test cases to initialize the replica stub. + const auto &load_case = GetParam(); + _stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag); + } + + ~LoadReplicasTest() override { _stub.remove_disk_dirs(); } + + void test_load_replicas(bool test_load_order, uint64_t max_replicas_on_load_for_each_disk) + { + _stub.test_load_replicas(test_load_order, max_replicas_on_load_for_each_disk); + } + +private: + mock_load_replica _stub; + + DISALLOW_COPY_AND_ASSIGN(LoadReplicasTest); + DISALLOW_MOVE_AND_ASSIGN(LoadReplicasTest); +}; + +TEST_P(LoadReplicasTest, LoadReplicas) { test_load_replicas(false, 256); } + +TEST_P(LoadReplicasTest, LoadOrder) { test_load_replicas(true, 256); } + +TEST_P(LoadReplicasTest, LoadThrottling) { test_load_replicas(false, 5); } + +// Generate a test case for loading replicas. Each element in `disk_replicas` is corresponding +// to the number of replicas on a disk. +load_replicas_case generate_load_replicas_case(const std::vector &disk_replicas) +{ + std::map dirs_by_tag; + for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) { + dirs_by_tag.emplace(fmt::format("data{}", disk_index), fmt::format("disk{}", disk_index)); + } + + static const uint32_t kNumBitsPartitions = 3; + static const uint32_t kNumPartitions = 1U << kNumBitsPartitions; + + uint32_t app_id = 1; + uint32_t partition_id = 0; + + std::map> replicas_by_tag; + + while (true) { + size_t finished_disks = 0; + + for (size_t disk_index = 0; disk_index < disk_replicas.size(); ++disk_index) { + auto &replica_list = replicas_by_tag[fmt::format("data{}", disk_index)]; + if (replica_list.size() >= disk_replicas[disk_index]) { + // All replicas on this disk have been generated, just skip to next disk. + ++finished_disks; + continue; + } + + // Generate a replica with current app id and partition index. + replica_list.emplace_back(static_cast(app_id), + static_cast(partition_id)); + + // Once next partition index is found 0, increment app id to turn to next table. + app_id += ((partition_id + 1) & kNumPartitions) >> kNumBitsPartitions; + + // Increment index to turn to next partition. + partition_id = (partition_id + 1) & (kNumPartitions - 1); + } + + if (finished_disks >= disk_replicas.size()) { + // All disks have been done. + break; + } + } + + return {dirs_by_tag, replicas_by_tag}; +} + +std::vector generate_load_replicas_cases() +{ + // At least 1 disk should be included (otherwise it would lead to core dump), thus do + // not generate the empty case (i.e. {}). + return std::vector({ + // There is only one disk which has none of replica. + generate_load_replicas_case({0}), + // There are two disks both of which have none of replica. + generate_load_replicas_case({0, 0}), + // There is only one disk which has one replica. + generate_load_replicas_case({1}), + // There are two disks one of which has one replica, and another has none. + generate_load_replicas_case({1, 0}), + generate_load_replicas_case({0, 1}), + // There is only one disk which has two replicas. + generate_load_replicas_case({2}), + // There are two disks one of which has two replicas, and another has none. + generate_load_replicas_case({2, 0}), + generate_load_replicas_case({0, 2}), + // There are at least three disks. + generate_load_replicas_case({1, 0, 2}), + generate_load_replicas_case({8, 25, 16}), + generate_load_replicas_case({17, 96, 56, 127}), + generate_load_replicas_case({22, 38, 0, 16}), + generate_load_replicas_case({82, 75, 36, 118, 65}), + generate_load_replicas_case({0, 92, 17, 68, 25}), + // There are many replicas for some disks. + generate_load_replicas_case({156, 367, 309, 58, 404, 298, 512, 82}), + generate_load_replicas_case({167, 28, 898, 516, 389, 422, 682, 265, 596}), + }); +} + +INSTANTIATE_TEST_SUITE_P(ReplicaStubTest, + LoadReplicasTest, + testing::ValuesIn(generate_load_replicas_cases())); + +} // namespace dsn::replication diff --git a/src/replica/test/replica_dir_test.cpp b/src/replica/test/replica_dir_test.cpp new file mode 100644 index 0000000000..3bc64d74f8 --- /dev/null +++ b/src/replica/test/replica_dir_test.cpp @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/gpid.h" +#include "gtest/gtest.h" +#include "replica/replica_stub.h" + +namespace dsn::replication { + +struct get_replica_dir_name_case +{ + std::string path; + std::string expected_replica_dir_name; +}; + +class GetReplicaDirNameTest : public testing::TestWithParam +{ +public: + static void test_get_replica_dir_name() + { + const auto &test_case = GetParam(); + const auto &actual_replica_dir_name = replica_stub::get_replica_dir_name(test_case.path); + EXPECT_EQ(test_case.expected_replica_dir_name, actual_replica_dir_name); + } +}; + +TEST_P(GetReplicaDirNameTest, GetReplicaDirName) { test_get_replica_dir_name(); } + +const std::vector get_replica_dir_name_tests{ + // Linux absolute path and non-empty dir name. + {"/data/pegasus/1.2.pegasus", "1.2.pegasus"}, + // Linux absolute path and empty dir name. + {"/data/pegasus/1.2.pegasus/", ""}, + // Windows absolute path and non-empty dir name. + {R"(D:\data\pegasus\1.2.pegasus)", "1.2.pegasus"}, + // Windows absolute path and empty dir name. + {R"(D:\data\pegasus\1.2.pegasus\)", ""}, + // Linux relative path and non-empty dir name. + {"./1.2.pegasus", "1.2.pegasus"}, + // Linux relative path and empty dir name. + {"./1.2.pegasus/", ""}, + // Windows relative path and non-empty dir name. + {R"(.\1.2.pegasus)", "1.2.pegasus"}, + // Windows relative path and empty dir name. + {R"(.\1.2.pegasus\)", ""}, +}; + +INSTANTIATE_TEST_SUITE_P(ReplicaDirTest, + GetReplicaDirNameTest, + testing::ValuesIn(get_replica_dir_name_tests)); + +struct parse_replica_dir_name_case +{ + std::string replica_dir_name; + bool ok; + gpid expected_pid; + std::string expected_app_type; +}; + +class ParseReplicaDirNameTest : public testing::TestWithParam +{ +public: + static void test_parse_replica_dir_name() + { + const auto &test_case = GetParam(); + + gpid actual_pid; + std::string actual_app_type; + ASSERT_EQ(test_case.ok, + replica_stub::parse_replica_dir_name( + test_case.replica_dir_name, actual_pid, actual_app_type)); + if (!test_case.ok) { + return; + } + + EXPECT_EQ(test_case.expected_pid, actual_pid); + EXPECT_EQ(test_case.expected_app_type, actual_app_type); + } +}; + +TEST_P(ParseReplicaDirNameTest, ParseReplicaDirName) { test_parse_replica_dir_name(); } + +const std::vector parse_replica_dir_name_tests{ + // Empty dir name. + {"", false, {}, ""}, + // Single-digit IDs. + {"1.2.pegasus", true, {1, 2}, "pegasus"}, + // Multi-digit IDs. + {"1234.56789.pegasus", true, {1234, 56789}, "pegasus"}, + // Custom app type other than "pegasus". + {"1.2.another", true, {1, 2}, "another"}, + // Custom app type with dot. + {"1.2.another.pegasus", true, {1, 2}, "another.pegasus"}, + // Custom app type with other specific symbol. + {"1.2.another_pegasus", true, {1, 2}, "another_pegasus"}, + // Missing one ID. + {"1.pegasus", false, {}, ""}, + // Missing both IDs. + {"pegasus", false, {}, ""}, + // ID with letter. + {"1.2a.pegasus", false, {}, ""}, + // ID with minus. + {"1.-2.pegasus", false, {}, ""}, +}; + +INSTANTIATE_TEST_SUITE_P(ReplicaDirTest, + ParseReplicaDirNameTest, + testing::ValuesIn(parse_replica_dir_name_tests)); + +} // namespace dsn::replication diff --git a/src/server/result_writer.cpp b/src/server/result_writer.cpp index df78172e9c..4d3a9160e0 100644 --- a/src/server/result_writer.cpp +++ b/src/server/result_writer.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include "pegasus/client.h" diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h index 2e2b34bb0d..549d8fd948 100644 --- a/src/test_util/test_util.h +++ b/src/test_util/test_util.h @@ -44,11 +44,12 @@ class file_meta; } // namespace replication } // namespace dsn +#define PRESERVE_VAR(name, expr) \ + const auto PRESERVED_##name = expr; \ + auto PRESERVED_##name##_cleanup = dsn::defer([PRESERVED_##name]() { expr = PRESERVED_##name; }) + // Save the current value of a flag and restore it at the end of the function. -#define PRESERVE_FLAG(name) \ - const auto PRESERVED_FLAGS_##name = FLAGS_##name; \ - auto PRESERVED_FLAGS_##name##_cleanup = \ - dsn::defer([PRESERVED_FLAGS_##name]() { FLAGS_##name = PRESERVED_FLAGS_##name; }) +#define PRESERVE_FLAG(name) PRESERVE_VAR(FLAGS_##name, FLAGS_##name) namespace pegasus { diff --git a/src/utils/autoref_ptr.h b/src/utils/autoref_ptr.h index 501698f852..c3ba9d9595 100644 --- a/src/utils/autoref_ptr.h +++ b/src/utils/autoref_ptr.h @@ -160,6 +160,15 @@ class ref_ptr T *operator->() const { return _obj; } + bool operator==(const ref_ptr &r) const { return _obj == r._obj; } + + template ::value>::type> + bool operator==(const ref_ptr &r) const + { + return _obj == r._obj; + } + bool operator==(T *r) const { return _obj == r; } bool operator!=(T *r) const { return _obj != r; } diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h index b971522d01..9f23d24bb7 100644 --- a/src/utils/command_manager.h +++ b/src/utils/command_manager.h @@ -36,6 +36,7 @@ #include #include #include +#include #include #include "utils/fmt_logging.h" @@ -65,14 +66,12 @@ class command_manager : public ::dsn::utils::singleton // 'validator' is used to validate the new value. // The value is reset to 'default_value' if passing "DEFAULT" argument. template - WARN_UNUSED_RESULT std::unique_ptr register_int_command( - T &value, - T default_value, - const std::string &command, - const std::string &help, - std::function validator = [](int64_t new_value) -> bool { - return new_value >= 0; - }) + WARN_UNUSED_RESULT std::unique_ptr + register_int_command(T &value, + T default_value, + const std::string &command, + const std::string &help, + std::function::type)> validator) { return register_single_command( command, @@ -83,6 +82,19 @@ class command_manager : public ::dsn::utils::singleton }); } + template + WARN_UNUSED_RESULT std::unique_ptr register_int_command( + T &value, T default_value, const std::string &command, const std::string &help) + { + return register_int_command(value, + default_value, + command, + help, + [](typename std::remove_reference::type new_value) -> bool { + return new_value >= 0; + }); + } + // Register a single 'command' with the 'help' description, its arguments are described in // 'args'. std::unique_ptr @@ -133,11 +145,12 @@ class command_manager : public ::dsn::utils::singleton set_bool(bool &value, const std::string &name, const std::vector &args); template - static std::string set_int(T &value, - T default_value, - const std::string &name, - const std::vector &args, - const std::function &validator) + static std::string + set_int(T &value, + T default_value, + const std::string &name, + const std::vector &args, + const std::function::type)> &validator) { nlohmann::json msg; msg["error"] = "ok"; @@ -164,8 +177,7 @@ class command_manager : public ::dsn::utils::singleton // Invalid argument. T new_value = 0; - if (!internal::buf2signed(args[0], new_value) || - !validator(static_cast(new_value))) { + if (!buf2numeric(args[0], new_value) || !validator(new_value)) { msg["error"] = fmt::format("ERR: invalid argument '{}', the value is not acceptable", args[0]); return msg.dump(2); diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h index d36aa98640..872ea4fb70 100644 --- a/src/utils/time_utils.h +++ b/src/utils/time_utils.h @@ -147,7 +147,7 @@ class chronograph inline void reset_start_time() { _start_time_ns = dsn_now_ns(); } - inline uint64_t duration_ns() const + [[nodiscard]] inline uint64_t duration_ns() const { auto now = dsn_now_ns(); CHECK_GE(now, _start_time_ns); @@ -155,6 +155,8 @@ class chronograph return now - _start_time_ns; } + [[nodiscard]] inline uint64_t duration_ms() const { return duration_ns() / 1'000'000; } + private: uint64_t _start_time_ns;