Skip to content

Commit

Permalink
Some renames
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Oct 2, 2023
1 parent 3763ecf commit 251fdae
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 129 deletions.
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ target_sources(${PROJECT_NAME} PRIVATE
lib/mesg_client.cpp
lib/service.cpp
lib/data_service_grpc.cpp
lib/messaging.cpp
lib/manager_impl.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}-proto>
)
settings_gen_cpp($<TARGET_FILE:flatbuffers::flatc> ${CMAKE_CURRENT_BINARY_DIR}/generated/ ${PROJECT_NAME} lib/nuraft_mesg_config.fbs)
Expand Down
55 changes: 28 additions & 27 deletions src/lib/messaging.cpp → src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// Copyright 2018 (c) eBay Corporation
//
#include "messaging.hpp"
#include "manager_impl.hpp"

#include <chrono>

Expand Down Expand Up @@ -83,14 +83,15 @@ class engine_factory : public group_factory {
}
};

service::~service() {
ManagerImpl::~ManagerImpl() {
if (_mesg_service) {
_grpc_server->shutdown();
_mesg_service->shutdown();
}
}

service::service(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app, bool and_data_svc) :
ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app,
bool and_data_svc) :
start_params_(start_params), _srv_id(to_server_id(start_params_.server_uuid_)), application_(app) {
_g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params_, app);
auto logger_name = fmt::format("nuraft_{}", start_params_.server_uuid_);
Expand Down Expand Up @@ -124,7 +125,7 @@ service::service(Manager::Params const& start_params, std::weak_ptr< MessagingAp
restart_server();
}

void service::restart_server() {
void ManagerImpl::restart_server() {
auto listen_address = fmt::format(FMT_STRING("0.0.0.0:{}"), start_params_.mesg_port_);
LOGINFO("Starting Messaging Service on http://{}", listen_address);

Expand All @@ -139,16 +140,16 @@ void service::restart_server() {
_mesg_service->bind(_grpc_server.get());
}

void service::register_mgr_type(std::string const& group_type, group_params const& params) {
void ManagerImpl::register_mgr_type(std::string const& group_type, group_params const& params) {
std::lock_guard< std::mutex > lg(_manager_lock);
auto [it, happened] = _state_mgr_types.emplace(std::make_pair(group_type, params));
DEBUG_ASSERT(_state_mgr_types.end() != it, "Out of memory?");
DEBUG_ASSERT(!!happened, "Re-register?");
if (_state_mgr_types.end() == it) { LOGERROR("Could not register group type: {}", group_type); }
}

nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(std::string const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
switch (type) {
case nuraft::cb_func::RemovedFromCluster: {
LOGINFO("Removed from cluster {}", group_id);
Expand Down Expand Up @@ -188,7 +189,7 @@ nuraft::cb_func::ReturnCode service::callback_handler(std::string const& group_i
return nuraft::cb_func::Ok;
}

void service::exit_group(std::string const& group_id) {
void ManagerImpl::exit_group(std::string const& group_id) {
std::shared_ptr< mesg_state_mgr > mgr;
{
std::lock_guard< std::mutex > lg(_manager_lock);
Expand All @@ -197,9 +198,9 @@ void service::exit_group(std::string const& group_id) {
if (mgr) mgr->leave();
}

std::error_condition service::group_init(int32_t const srv_id, std::string const& group_id,
std::string const& group_type, nuraft::context*& ctx,
std::shared_ptr< nuraft_mesg::group_metrics > metrics) {
std::error_condition ManagerImpl::group_init(int32_t const srv_id, std::string const& group_id,
std::string const& group_type, nuraft::context*& ctx,
std::shared_ptr< nuraft_mesg::group_metrics > metrics) {
LOGDEBUGMOD(nuraft_mesg, "Creating context for Group: {} as Member: {}", group_id, srv_id);

// State manager (RAFT log store, config)
Expand Down Expand Up @@ -245,7 +246,7 @@ std::error_condition service::group_init(int32_t const srv_id, std::string const
return std::error_condition();
}

NullAsyncResult service::add_member(std::string const& group_id, std::string const& new_id) {
NullAsyncResult ManagerImpl::add_member(std::string const& group_id, std::string const& new_id) {
return _mesg_service->add_srv(group_id, nuraft::srv_config(to_server_id(new_id), new_id))
.deferValue([this, g_id = group_id, n_id = new_id](auto cmd_result) mutable -> NullResult {
auto result = cmd_result.value();
Expand All @@ -267,7 +268,7 @@ NullAsyncResult service::add_member(std::string const& group_id, std::string con
});
}

NullAsyncResult service::rem_member(std::string const& group_id, std::string const& old_id) {
NullAsyncResult ManagerImpl::rem_member(std::string const& group_id, std::string const& old_id) {
return _mesg_service->rm_srv(group_id, to_server_id(old_id))
.deferValue([this, group_id](auto cmd_result) mutable -> NullResult {
auto result = cmd_result.value();
Expand All @@ -282,13 +283,13 @@ NullAsyncResult service::rem_member(std::string const& group_id, std::string con
});
}

std::shared_ptr< mesg_state_mgr > service::lookup_state_manager(std::string const& group_id) const {
std::shared_ptr< mesg_state_mgr > ManagerImpl::lookup_state_manager(std::string const& group_id) const {
std::lock_guard< std::mutex > lg(_manager_lock);
if (auto it = _state_managers.find(group_id); _state_managers.end() != it) return it->second;
return nullptr;
}

NullAsyncResult service::create_group(std::string const& group_id, std::string const& group_type_name) {
NullAsyncResult ManagerImpl::create_group(std::string const& group_id, std::string const& group_type_name) {
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader.insert(std::make_pair(group_id, false));
Expand All @@ -309,8 +310,8 @@ NullAsyncResult service::create_group(std::string const& group_id, std::string c
});
}

NullResult service::join_group(std::string const& group_id, std::string const& group_type,
std::shared_ptr< mesg_state_mgr > smgr) {
NullResult ManagerImpl::join_group(std::string const& group_id, std::string const& group_type,
std::shared_ptr< mesg_state_mgr > smgr) {
{
std::lock_guard< std::mutex > lg(_manager_lock);
auto [it, happened] = _state_managers.emplace(group_id, smgr);
Expand All @@ -326,7 +327,7 @@ NullResult service::join_group(std::string const& group_id, std::string const& g
return folly::Unit();
}

void service::append_peers(std::string const& group_id, std::list< std::string >& servers) const {
void ManagerImpl::append_peers(std::string const& group_id, std::list< std::string >& servers) const {
std::lock_guard< std::mutex > lg(_manager_lock);
if (auto it = _state_managers.find(group_id); _state_managers.end() != it) {
if (auto config = it->second->load_config(); config) {
Expand All @@ -337,7 +338,7 @@ void service::append_peers(std::string const& group_id, std::list< std::string >
}
}

NullAsyncResult service::become_leader(std::string const& group_id) {
NullAsyncResult ManagerImpl::become_leader(std::string const& group_id) {
{
auto lk = std::unique_lock< std::mutex >(_manager_lock);
if (_is_leader[group_id]) { return folly::Unit(); }
Expand All @@ -363,7 +364,7 @@ NullAsyncResult service::become_leader(std::string const& group_id) {
});
}

void service::leave_group(std::string const& group_id) {
void ManagerImpl::leave_group(std::string const& group_id) {
LOGINFO("Leaving group [vol={}]", group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
Expand All @@ -385,26 +386,26 @@ void service::leave_group(std::string const& group_id) {
LOGINFO("Finished leaving: [vol={}]", group_id);
}

NullAsyncResult service::client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >& buf) {
NullAsyncResult ManagerImpl::client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >& buf) {
return _mesg_service->append_entries(group_id, {buf}).deferValue([](auto cmd_result) -> NullResult {
auto result = cmd_result.value();
if (nuraft::OK != result) return folly::makeUnexpected(convertToError(result));
return folly::Unit();
});
}
uint32_t service::logstore_id(std::string const& group_id) const {
uint32_t ManagerImpl::logstore_id(std::string const& group_id) const {
std::lock_guard< std::mutex > lg(_manager_lock);
if (auto it = _state_managers.find(group_id); _state_managers.end() != it) { return it->second->get_logstore_id(); }
return UINT32_MAX;
}

void service::get_srv_config_all(std::string const& group_name,
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) {
void ManagerImpl::get_srv_config_all(std::string const& group_name,
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) {
_mesg_service->get_srv_config_all(group_name, configs_out);
}

bool service::bind_data_service_request(std::string const& request_name, std::string const& group_id,
data_service_request_handler_t const& request_handler) {
bool ManagerImpl::bind_data_service_request(std::string const& request_name, std::string const& group_id,
data_service_request_handler_t const& request_handler) {
return _mesg_service->bind_data_service_request(request_name, group_id, request_handler);
}

Expand Down Expand Up @@ -435,7 +436,7 @@ void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_fa
std::shared_ptr< Manager > init_messaging(Manager::Params const& p, std::weak_ptr< MessagingApplication > w,
bool with_data_svc) {
RELEASE_ASSERT(w.lock(), "Could not acquire application!");
return std::make_shared< service >(p, w, with_data_svc);
return std::make_shared< ManagerImpl >(p, w, with_data_svc);
}

} // namespace nuraft_mesg
6 changes: 3 additions & 3 deletions src/lib/messaging.hpp → src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class group_factory;
class msg_service;
class group_metrics;

class service : public Manager {
class ManagerImpl : public Manager {
Manager::Params start_params_;
int32_t _srv_id;

Expand All @@ -62,8 +62,8 @@ class service : public Manager {
void exit_group(std::string const& group_id);

public:
service(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false);
~service() override;
ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false);
~ManagerImpl() override;

int32_t server_id() const override { return _srv_id; }

Expand Down
2 changes: 1 addition & 1 deletion src/lib/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <sisl/metrics/metrics.hpp>

#include "proto/messaging_service.grpc.pb.h"
#include "messaging.hpp"
#include "manager_impl.hpp"
#include "data_service_grpc.hpp"

namespace nuraft_mesg {
Expand Down
Loading

0 comments on commit 251fdae

Please sign in to comment.