Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unneeded indirection in code path. #46

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "2.0.2"
version = "2.0.3"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
3 changes: 1 addition & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ target_sources(${PROJECT_NAME} PRIVATE
lib/grpc_client.cpp
lib/grpc_factory.cpp
lib/grpc_server.cpp
lib/mesg_client.cpp
lib/service.cpp
lib/mesg_factory.cpp
lib/data_service_grpc.cpp
lib/manager_impl.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}-proto>
Expand Down
15 changes: 15 additions & 0 deletions src/lib/common_lib.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <sisl/logging/logging.h>
#include <sisl/metrics/metrics.hpp>

#include "nuraft_mesg/common.hpp"

Expand All @@ -10,3 +11,17 @@
#define LOGW(...) LOGWARNMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGE(...) LOGERRORMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGC(...) LOGCRITICALMOD(nuraft_mesg, ##__VA_ARGS__)

namespace nuraft_mesg {
class group_metrics : public sisl::MetricsGroupWrapper {
public:
explicit group_metrics(group_id_t const& group_id) :
sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_id).c_str()) {
REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"});
REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"});
register_me_to_farm();
}

~group_metrics() { deregister_me_from_farm(); }
};
} // namespace nuraft_mesg
2 changes: 1 addition & 1 deletion src/lib/grpc_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
if (!happened) {
LOGD("Re-creating client for {}", client);
if (auto err = reinit_client(client, it->second); nuraft::OK != err) {
LOGE("Failed to re-initialize client {}: {}", client, err);
LOGW("Failed to re-initialize client {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
Expand Down
372 changes: 306 additions & 66 deletions src/lib/manager_impl.cpp

Large diffs are not rendered by default.

35 changes: 28 additions & 7 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <mutex>
#include <string>

#include <folly/SharedMutex.h>
#include <sisl/logging/logging.h>

#include "data_service_grpc.hpp"
#include "nuraft_mesg/mesg_factory.hpp"
#include "common_lib.hpp"

Expand All @@ -32,19 +34,29 @@ class GrpcServer;

namespace nuraft_mesg {
class group_factory;
class msg_service;
class group_metrics;
class Messaging;
class RaftGroupMsg;

class ManagerImpl : public Manager {
struct grpc_server_wrapper {
explicit grpc_server_wrapper(group_id_t const& group_id);

std::shared_ptr< grpc_server > m_server;
std::shared_ptr< group_metrics > m_metrics;
};

class ManagerImpl : public Manager, public std::enable_shared_from_this< ManagerImpl > {
Manager::Params start_params_;
int32_t _srv_id;

std::map< group_type_t, Manager::group_params > _state_mgr_types;

std::weak_ptr< MessagingApplication > application_;
std::shared_ptr< group_factory > _g_factory;
std::shared_ptr< msg_service > _mesg_service;
std::unique_ptr<::sisl::GrpcServer > _grpc_server;

folly::SharedMutex _raft_servers_lock;
std::map< group_id_t, grpc_server_wrapper > _raft_servers;

std::unique_ptr< ::sisl::GrpcServer > _grpc_server;

std::mutex mutable _manager_lock;
std::map< group_id_t, std::shared_ptr< mesg_state_mgr > > _state_managers;
Expand All @@ -55,12 +67,19 @@ class ManagerImpl : public Manager {
nuraft::ptr< nuraft::delayed_task_scheduler > _scheduler;
std::shared_ptr< sisl::logging::logger_t > _custom_logger;

data_service_grpc _data_service;
bool _data_service_enabled;

nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type,
nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics);
nuraft::cb_func::ReturnCode callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
nuraft::cb_func::ReturnCode raft_event(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param);
void exit_group(group_id_t const& group_id);

nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_id, group_type_t const&);

bool raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data);

public:
ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false);
~ManagerImpl() override;
Expand Down Expand Up @@ -92,6 +111,8 @@ class ManagerImpl : public Manager {

void get_srv_config_all(group_id_t const& group_id,
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) override;

void shutdown_for(group_id_t const&);
};

class repl_service_ctx_grpc : public repl_service_ctx {
Expand Down
3 changes: 1 addition & 2 deletions src/lib/mesg_client.cpp → src/lib/mesg_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <folly/futures/Future.h>

#include "nuraft_mesg/mesg_factory.hpp"
#include "service.hpp"
#include "proto/messaging_service.grpc.pb.h"
#include "utils.hpp"
#include "nuraft_mesg_config.hpp"
Expand Down Expand Up @@ -197,7 +196,7 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional<
// We ignore the vector of future response from collect all and st the value as folly::unit.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to all
// the peers.
return folly::collectAll(calls).deferValue([](auto &&) -> NullResult { return folly::unit; });
return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; });
}

AsyncResult< sisl::io_blob >
Expand Down
Loading