Skip to content

Commit

Permalink
fix load replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan authored and 王聃 committed Dec 16, 2024
1 parent 1116e0f commit 1885144
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
35 changes: 15 additions & 20 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ void replica_stub::load_replica(dir_node *dn,
const auto *const worker = task::get_current_worker2();
if (worker != nullptr) {
CHECK(!(worker->pool()->spec().partitioned),
"The thread pool for loading replicas must not be partitioned since load balancing "
"is required among multiple threads");
"The thread pool LPC_REPLICATION_INIT_LOAD for loading replicas must not be "
"partitioned since load balancing is required among multiple threads");
}

auto rep = load_replica(dn, dir.c_str());
Expand All @@ -513,12 +513,13 @@ void replica_stub::load_replica(dir_node *dn,
rep->last_prepared_decree());

utils::auto_lock<utils::ex_lock> l(reps_lock);
CHECK(reps.find(rep->get_gpid()) == reps.end(),
const auto rep_iter = reps.find(rep->get_gpid());
CHECK(rep_iter == reps.end(),
"conflict replica dir: {} <--> {}",
rep->dir(),
reps[rep->get_gpid()]->dir());
rep_iter->second->dir());

reps[rep->get_gpid()] = rep;
reps.emplace(rep->get_gpid(), rep);
}

void replica_stub::load_replicas(replicas &reps)
Expand Down Expand Up @@ -556,13 +557,10 @@ void replica_stub::load_replicas(replicas &reps)

const auto &dn = disks[disk_index].first;
auto &load_disk_queue = load_disk_queues[disk_index];
if (!load_disk_queue.empty() &&
load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
// Loading replicas should be throttled in case that disk IO is saturated.
if (load_disk_queue.front().second->wait(
if (!load_disk_queue.front().second->wait(
static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) {
load_disk_queue.pop();
} else {
// There might be too many replicas that are being loaded which lead to
// slow disk IO.
LOG_WARNING("after {} ms, loading dir({}) is still not finished, there are "
Expand All @@ -580,10 +578,13 @@ void replica_stub::load_replicas(replicas &reps)
}

// Continue to load a replica since we are within the limit now.
if (dsn_unlikely(load_disk_queue.size() >=
FLAGS_max_replicas_on_load_for_each_disk)) {
continue;
}
load_disk_queue.pop();
}

const auto &dir = dirs[dir_index++];
if (dsn::replication::is_data_dir_invalid(dir)) {
LOG_WARNING("ignore dir {}", dir);
continue;
}

LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})",
Expand All @@ -593,12 +594,6 @@ void replica_stub::load_replicas(replicas &reps)
dn->tag,
dn->full_dir);

const auto &dir = dirs[dir_index++];
if (dsn::replication::is_data_dir_invalid(dir)) {
LOG_WARNING("ignore dir {}", dir);
continue;
}

load_disk_queue.emplace(
dir,
tasking::create_task(
Expand Down
5 changes: 1 addition & 4 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
void
load_replica(dir_node *dn, const std::string &dir, utils::ex_lock &reps_lock, replicas &reps);

// Load all replicas synchronously from all disks to `reps`. This function would ensure
// that data on each disk is loaded more evenly, rather than that a disk would begin to
// be loaded only after another has been finished, in case that there are too many replicas
// on a disk and other disks cannot start loading until this disk is finished.
// Load all replicas simultaneously from all disks to `reps`.
void load_replicas(replicas &reps);

// Clean up the memory state and on disk data if creating replica failed.
Expand Down

0 comments on commit 1885144

Please sign in to comment.