Skip to content

Commit

Permalink
refactor and add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Dec 20, 2024
1 parent f0835d6 commit b140d54
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
45 changes: 25 additions & 20 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <iterator>
#include <mutex>
Expand Down Expand Up @@ -2228,7 +2227,7 @@ replica *replica_stub::new_replica(gpid gpid,
bool restore_if_necessary,
bool is_duplication_follower)
{
return new_replica(gpid, app, restore_if_necessary, is_duplication_follower, "");
return new_replica(gpid, app, restore_if_necessary, is_duplication_follower, {});
}

/*static*/ std::string replica_stub::get_replica_dir_name(const std::string &dir)
Expand Down Expand Up @@ -3017,27 +3016,33 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
});

zauto_write_lock l(_replicas_lock);
auto it = _replicas.find(child_pid);

const auto it = _replicas.find(child_pid);
if (it != _replicas.end()) {
return it->second;
} else {
if (_opening_replicas.find(child_pid) != _opening_replicas.end()) {
LOG_WARNING("failed create child replica({}) because it is under open", child_pid);
return nullptr;
} else if (_closing_replicas.find(child_pid) != _closing_replicas.end()) {
LOG_WARNING("failed create child replica({}) because it is under close", child_pid);
return nullptr;
} else {
replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
if (rep != nullptr) {
auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(child_pid);
}
return rep;
}
}

if (_opening_replicas.find(child_pid) != _opening_replicas.end()) {
LOG_WARNING("failed create child replica({}) because it is under open", child_pid);
return nullptr;
}

if (_closing_replicas.find(child_pid) != _closing_replicas.end()) {
LOG_WARNING("failed create child replica({}) because it is under close", child_pid);
return nullptr;
}

replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
if (rep == nullptr) {
return nullptr;
}

const auto pr = _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(child_pid);

return rep;
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
15 changes: 9 additions & 6 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,22 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
gpid id,
const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<configuration_update_request> &req2);
// Create a new replica according to the parameters.
// 'parent_dir' is used in partition split for create_child_replica_dir().

// Create a child replica for partition split, with 'parent_dir' specified as the parent
// replica dir used for `create_child_replica_dir()`.
replica *new_replica(gpid gpid,
const app_info &app,
bool restore_if_necessary,
bool is_duplication_follower,
const std::string &parent_dir);

// Create a new replica, choosing and assigning the best dir for it.
replica *new_replica(gpid gpid,
const app_info &app,
bool restore_if_necessary,
bool is_duplication_follower);

// Each disk with its candidate replica dirs, used to load replicas while initializing.
struct disk_replicas_info
{
// `dir_node` for each disk.
Expand All @@ -384,11 +387,11 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// Get the absolute dirs of all replicas for all disks.
std::vector<disk_replicas_info> get_all_disk_dirs() const;

// Get the dir name for a replica from a potentially longer path (both absolute and
// relative paths are possible).
// Get the replica dir name from a potentially longer path (`dir` could be an absolute
// or relative path).
static std::string get_replica_dir_name(const std::string &dir);

// Parse app id, partition id and app type from the dir name of a replica.
// Parse app id, partition id and app type from the replica dir name.
static bool
parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type);

Expand Down Expand Up @@ -503,7 +506,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter

using opening_replica_map_by_gpid = std::unordered_map<gpid, task_ptr>;

// `task_ptr` is the task being closed.
// `task_ptr` is the task closing a replica.
using closing_replica_map_by_gpid =
std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>;

Expand Down

0 comments on commit b140d54

Please sign in to comment.