Skip to content

Commit

Permalink
Merge mesg_service into manager_impl
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Nov 10, 2023
1 parent 8066124 commit 092b1f3
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 544 deletions.
1 change: 0 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ target_sources(${PROJECT_NAME} PRIVATE
lib/grpc_factory.cpp
lib/grpc_server.cpp
lib/mesg_client.cpp
lib/service.cpp
lib/data_service_grpc.cpp
lib/manager_impl.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}-proto>
Expand Down
15 changes: 15 additions & 0 deletions src/lib/common_lib.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <sisl/logging/logging.h>
#include <sisl/metrics/metrics.hpp>

#include "nuraft_mesg/common.hpp"

Expand All @@ -10,3 +11,17 @@
#define LOGW(...) LOGWARNMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGE(...) LOGERRORMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGC(...) LOGCRITICALMOD(nuraft_mesg, ##__VA_ARGS__)

namespace nuraft_mesg {
class group_metrics : public sisl::MetricsGroupWrapper {
public:
explicit group_metrics(group_id_t const& group_id) :
sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_id).c_str()) {
REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"});
REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"});
register_me_to_farm();
}

~group_metrics() { deregister_me_from_farm(); }
};
} // namespace nuraft_mesg
2 changes: 1 addition & 1 deletion src/lib/grpc_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
if (!happened) {
LOGD("Re-creating client for {}", client);
if (auto err = reinit_client(client, it->second); nuraft::OK != err) {
LOGE("Failed to re-initialize client {}: {}", client, err);
LOGW("Failed to re-initialize client {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
Expand Down
372 changes: 306 additions & 66 deletions src/lib/manager_impl.cpp

Large diffs are not rendered by default.

35 changes: 28 additions & 7 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <mutex>
#include <string>

#include <folly/SharedMutex.h>
#include <sisl/logging/logging.h>

#include "data_service_grpc.hpp"
#include "nuraft_mesg/mesg_factory.hpp"
#include "common_lib.hpp"

Expand All @@ -32,19 +34,29 @@ class GrpcServer;

namespace nuraft_mesg {
class group_factory;
class msg_service;
class group_metrics;
class Messaging;
class RaftGroupMsg;

class ManagerImpl : public Manager {
struct grpc_server_wrapper {
explicit grpc_server_wrapper(group_id_t const& group_id);

std::shared_ptr< grpc_server > m_server;
std::shared_ptr< group_metrics > m_metrics;
};

class ManagerImpl : public Manager, public std::enable_shared_from_this< ManagerImpl > {
Manager::Params start_params_;
int32_t _srv_id;

std::map< group_type_t, Manager::group_params > _state_mgr_types;

std::weak_ptr< MessagingApplication > application_;
std::shared_ptr< group_factory > _g_factory;
std::shared_ptr< msg_service > _mesg_service;
std::unique_ptr<::sisl::GrpcServer > _grpc_server;

folly::SharedMutex _raft_servers_lock;
std::map< group_id_t, grpc_server_wrapper > _raft_servers;

std::unique_ptr< ::sisl::GrpcServer > _grpc_server;

std::mutex mutable _manager_lock;
std::map< group_id_t, std::shared_ptr< mesg_state_mgr > > _state_managers;
Expand All @@ -55,12 +67,19 @@ class ManagerImpl : public Manager {
nuraft::ptr< nuraft::delayed_task_scheduler > _scheduler;
std::shared_ptr< sisl::logging::logger_t > _custom_logger;

data_service_grpc _data_service;
bool _data_service_enabled;

nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type,
nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics);
nuraft::cb_func::ReturnCode callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
nuraft::cb_func::ReturnCode raft_event(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
void exit_group(group_id_t const& group_id);

nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_id, group_type_t const&);

bool raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data);

public:
ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false);
~ManagerImpl() override;
Expand Down Expand Up @@ -92,6 +111,8 @@ class ManagerImpl : public Manager {

void get_srv_config_all(group_id_t const& group_id,
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) override;

void shutdown_for(group_id_t const&);
};

class repl_service_ctx_grpc : public repl_service_ctx {
Expand Down
3 changes: 1 addition & 2 deletions src/lib/mesg_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <folly/futures/Future.h>

#include "nuraft_mesg/mesg_factory.hpp"
#include "service.hpp"
#include "proto/messaging_service.grpc.pb.h"
#include "utils.hpp"
#include "nuraft_mesg_config.hpp"
Expand Down Expand Up @@ -197,7 +196,7 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional<
// We ignore the vector of future response from collect all and st the value as folly::unit.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to all
// the peers.
return folly::collectAll(calls).deferValue([](auto &&) -> NullResult { return folly::unit; });
return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; });
}

AsyncResult< sisl::io_blob >
Expand Down
Loading

0 comments on commit 092b1f3

Please sign in to comment.