From 1885144bbd97fc938e7600b7b25267025b5eaa39 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 26 Sep 2024 21:13:17 +0800 Subject: [PATCH] fix load replicas --- src/replica/replica_stub.cpp | 35 +++++++++++++++-------------------- src/replica/replica_stub.h | 5 +---- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index a6b46196ab..e889908640 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -493,8 +493,8 @@ void replica_stub::load_replica(dir_node *dn, const auto *const worker = task::get_current_worker2(); if (worker != nullptr) { CHECK(!(worker->pool()->spec().partitioned), - "The thread pool for loading replicas must not be partitioned since load balancing " - "is required among multiple threads"); + "The thread pool LPC_REPLICATION_INIT_LOAD for loading replicas must not be " + "partitioned since load balancing is required among multiple threads"); } auto rep = load_replica(dn, dir.c_str()); @@ -513,12 +513,13 @@ void replica_stub::load_replica(dir_node *dn, rep->last_prepared_decree()); utils::auto_lock l(reps_lock); - CHECK(reps.find(rep->get_gpid()) == reps.end(), + const auto rep_iter = reps.find(rep->get_gpid()); + CHECK(rep_iter == reps.end(), "conflict replica dir: {} <--> {}", rep->dir(), - reps[rep->get_gpid()]->dir()); + rep_iter->second->dir()); - reps[rep->get_gpid()] = rep; + reps.emplace(rep->get_gpid(), rep); } void replica_stub::load_replicas(replicas &reps) @@ -556,13 +557,10 @@ void replica_stub::load_replicas(replicas &reps) const auto &dn = disks[disk_index].first; auto &load_disk_queue = load_disk_queues[disk_index]; - if (!load_disk_queue.empty() && - load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { + if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { // Loading replicas should be throttled in case that disk IO is saturated. - if (load_disk_queue.front().second->wait( + if (!load_disk_queue.front().second->wait( static_cast(FLAGS_load_replica_max_wait_time_ms))) { - load_disk_queue.pop(); - } else { // There might be too many replicas that are being loaded which lead to // slow disk IO. LOG_WARNING("after {} ms, loading dir({}) is still not finished, there are " @@ -580,10 +578,13 @@ void replica_stub::load_replicas(replicas &reps) } // Continue to load a replica since we are within the limit now. - if (dsn_unlikely(load_disk_queue.size() >= - FLAGS_max_replicas_on_load_for_each_disk)) { - continue; - } + load_disk_queue.pop(); + } + + const auto &dir = dirs[dir_index++]; + if (dsn::replication::is_data_dir_invalid(dir)) { + LOG_WARNING("ignore dir {}", dir); + continue; } LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})", @@ -593,12 +594,6 @@ void replica_stub::load_replicas(replicas &reps) dn->tag, dn->full_dir); - const auto &dir = dirs[dir_index++]; - if (dsn::replication::is_data_dir_invalid(dir)) { - LOG_WARNING("ignore dir {}", dir); - continue; - } - load_disk_queue.emplace( dir, tasking::create_task( diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index fdf5d81977..cc15987aca 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -395,10 +395,7 @@ class replica_stub : public serverlet, public ref_counter void load_replica(dir_node *dn, const std::string &dir, utils::ex_lock &reps_lock, replicas &reps); - // Load all replicas synchronously from all disks to `reps`. This function would ensure - // that data on each disk is loaded more evenly, rather than that a disk would begin to - // be loaded only after another has been finished, in case that there are too many replicas - // on a disk and other disks cannot start loading until this disk is finished. + // Load all replicas simultaneously from all disks to `reps`. void load_replicas(replicas &reps); // Clean up the memory state and on disk data if creating replica failed.