Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 20, 2024
1 parent b140d54 commit dd324c9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
40 changes: 26 additions & 14 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,15 @@

namespace {

const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
"The maximum concurrent bulk load downloading replica count.";

const char *kMaxReplicasOnLoadForEachDiskDesc =
"The max number of replicas that are allowed to be loaded simultaneously for each disk dir.";

const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica loading to complete.";

} // anonymous namespace
const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
"The maximum concurrent bulk load downloading replica count.";

DSN_DEFINE_int32(replication,
max_concurrent_bulk_load_downloading_count,
5,
kMaxConcurrentBulkLoadDownloadingCountDesc);
DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
[](int32_t value) -> bool { return value >= 0; });
} // anonymous namespace

METRIC_DEFINE_gauge_int64(server,
total_replicas,
Expand Down Expand Up @@ -290,6 +283,13 @@ DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE);
DSN_DEFINE_validator(load_replica_max_wait_time_ms,
[](uint64_t value) -> bool { return value > 0; });

DSN_DEFINE_int32(replication,
max_concurrent_bulk_load_downloading_count,
5,
kMaxConcurrentBulkLoadDownloadingCountDesc);
DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
[](int32_t value) -> bool { return value >= 0; });

DSN_DEFINE_bool(replication,
deny_client_on_start,
false,
Expand Down Expand Up @@ -540,7 +540,11 @@ void replica_stub::load_replica(dir_node *disk_node,
replica_map_by_gpid &reps,
std::atomic<size_t> &finished_dir_count)
{
SCOPED_LOG_TIMING(INFO, "on loading {}:{}", disk_node->tag, replica_dir);
// Measure execution time for loading a replica dir.
//
// TODO(wangdan): support decimal milliseconds or microseconds, since loading a small
// replica tends to spend less than 1 milliseconds and show "0ms" in logging.
SCOPED_LOG_TIMING(INFO, "on loading replica dir {}:{}", disk_node->tag, replica_dir);

LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, replica_dir);

Expand Down Expand Up @@ -588,14 +592,19 @@ void replica_stub::load_replica(dir_node *disk_node,

void replica_stub::load_replicas(replica_map_by_gpid &reps)
{
// Measure execution time for loading all replicas from all healthy disks without IO errors.
//
// TODO(wangdan): show both the size of output replicas and execution time on just one
// logging line.
SCOPED_LOG_TIMING(INFO, "on loading replicas");

const auto &disks = get_all_disk_dirs();

// The max index of dirs that are currently being loaded for each disk. The dirs with
// higher indexes have not begun to be loaded (namely pushed into the queue).
// The max index of dirs that are currently being loaded for each disk, which means the dirs
// with higher indexes have not begun to be loaded (namely pushed into the queue).
std::vector<size_t> replica_dir_indexes(disks.size(), 0);

// Each loader is for a replica dir, including its path and loading task.
struct replica_dir_loader
{
size_t replica_dir_index;
Expand All @@ -608,6 +617,9 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
// the queue.
std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size());

// The number of loading replica dirs that have been finished for each disk, used to show
// current progress.
//
// TODO(wangdan): calculate the number of successful or failed loading of replica dirs,
// and the number for each reason if failed.
std::vector<std::atomic<size_t>> finished_replica_dirs(disks.size());
Expand Down Expand Up @@ -639,7 +651,7 @@ void replica_stub::load_replicas(replica_map_by_gpid &reps)
auto &replica_dir_index = replica_dir_indexes[disk_index];
if (replica_dir_index >= replica_dirs.size()) {
// All of the replicas for the disk `disks[disk_index]` have begun to be loaded,
// thus just skip.
// thus just skip to next disk.
++finished_disks;
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
std::vector<std::string> replica_dirs;
};

// Get the absolute dirs of all replicas for all disks.
// Get the absolute dirs of all replicas for all healthy disks without IO errors.
std::vector<disk_replicas_info> get_all_disk_dirs() const;

// Get the replica dir name from a potentially longer path (`dir` could be an absolute
Expand Down

0 comments on commit dd324c9

Please sign in to comment.