diff --git a/src/lib/manager_impl.cpp b/src/lib/manager_impl.cpp index c980376..53e1d1d 100644 --- a/src/lib/manager_impl.cpp +++ b/src/lib/manager_impl.cpp @@ -56,8 +56,7 @@ ManagerImpl::~ManagerImpl() { } } -ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app, - bool and_data_svc) : +ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< MessagingApplication > app) : start_params_(start_params), _srv_id(to_server_id(start_params_.server_uuid_)), application_(app) { _g_factory = std::make_shared< engine_factory >(grpc_client_threads, start_params_, app); auto logger_name = fmt::format("nuraft_{}", start_params_.server_uuid_); @@ -76,19 +75,13 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes nuraft::asio_service::options service_options; service_options.thread_pool_size_ = 1; _scheduler = std::make_shared< nuraft::asio_service >(service_options, logger); +} - // The function passed to msg_service will be called each time a new group is joined, - // allowing sharing of the Server and client amongst raft instances. - - _mesg_service = msg_service::create( - [this](int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, nuraft::context*& ctx, - std::shared_ptr< group_metrics > metrics) mutable -> nuraft::cmd_result_code { - return this->group_init(srv_id, group_id, group_type, ctx, metrics); - }, - start_params_.server_uuid_, and_data_svc); - _mesg_service->setDefaultGroupType(start_params_.default_group_type_); - - // Start a gRPC server and create and associate nuraft_mesg services. +void ManagerImpl::start(bool and_data_svc) { + if (auto lg = std::lock_guard< std::mutex >(_manager_lock); !_mesg_service) { + _mesg_service = msg_service::create(shared_from_this(), start_params_.server_uuid_, and_data_svc); + _mesg_service->setDefaultGroupType(start_params_.default_group_type_); + } restart_server(); } @@ -97,6 +90,7 @@ void ManagerImpl::restart_server() { LOGI("Starting Messaging Service on http://{}", listen_address); std::lock_guard< std::mutex > lg(_manager_lock); + RELEASE_ASSERT(_mesg_service, "Need to call ::start() first!"); _grpc_server.reset(); _grpc_server = std::unique_ptr< sisl::GrpcServer >( sisl::GrpcServer::make(listen_address, start_params_.token_verifier_, grpc_server_threads, @@ -115,8 +109,7 @@ void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params if (_state_mgr_types.end() == it) { LOGE("Could not register [group_type={}]", group_type); } } -nuraft::cb_func::ReturnCode ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, - nuraft::cb_func::Param* param) { +void ManagerImpl::raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param) { switch (type) { case nuraft::cb_func::RemovedFromCluster: { LOGI("[srv_id={}] evicted from: [group={}]", start_params_.server_uuid_, group_id); @@ -154,7 +147,6 @@ nuraft::cb_func::ReturnCode ManagerImpl::raft_event(group_id_t const& group_id, default: break; }; - return nuraft::cb_func::Ok; } void ManagerImpl::exit_group(group_id_t const& group_id) { @@ -207,8 +199,10 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t nuraft::ptr< nuraft::logger > logger = std::make_shared< nuraft_mesg_logger >(group_id, _custom_logger); ctx = new nuraft::context(smgr, sm, listener, logger, rpc_cli_factory, _scheduler, params); - ctx->set_cb_func([this, group_id](nuraft::cb_func::Type type, nuraft::cb_func::Param* param) mutable { - return this->raft_event(group_id, type, param); + ctx->set_cb_func([wp = std::weak_ptr< ManagerImpl >(shared_from_this()), group_id](nuraft::cb_func::Type type, + nuraft::cb_func::Param* param) { + if (auto sp = wp.lock(); sp) sp->raft_event(group_id, type, param); + return nuraft::cb_func::Ok; }); return nuraft::cmd_result_code::OK; @@ -275,7 +269,7 @@ NullAsyncResult ManagerImpl::create_group(group_id_t const& group_id, std::strin std::lock_guard< std::mutex > lg(_manager_lock); _is_leader.insert(std::make_pair(group_id, false)); } - if (auto const err = _mesg_service->createRaftGroup(_srv_id, group_id, group_type_name); err) { + if (auto const err = _mesg_service->joinRaftGroup(_srv_id, group_id, group_type_name); err) { return folly::makeUnexpected(err); } @@ -354,6 +348,7 @@ void ManagerImpl::get_srv_config_all(group_id_t const& group_id, bool ManagerImpl::bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler) { + RELEASE_ASSERT(_mesg_service, "Need to call ::start() first!"); return _mesg_service->bind_data_service_request(request_name, group_id, request_handler); } @@ -364,7 +359,9 @@ void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_fa std::shared_ptr< Manager > init_messaging(Manager::Params const& p, std::weak_ptr< MessagingApplication > w, bool with_data_svc) { RELEASE_ASSERT(w.lock(), "Could not acquire application!"); - return std::make_shared< ManagerImpl >(p, w, with_data_svc); + auto m = std::make_shared< ManagerImpl >(p, w); + m->start(with_data_svc); + return m; } } // namespace nuraft_mesg diff --git a/src/lib/manager_impl.hpp b/src/lib/manager_impl.hpp index a576156..5160754 100644 --- a/src/lib/manager_impl.hpp +++ b/src/lib/manager_impl.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -36,7 +37,7 @@ class group_factory; class msg_service; class group_metrics; -class ManagerImpl : public Manager { +class ManagerImpl : public Manager, public std::enable_shared_from_this< ManagerImpl > { Manager::Params start_params_; int32_t _srv_id; @@ -56,20 +57,20 @@ class ManagerImpl : public Manager { nuraft::ptr< nuraft::delayed_task_scheduler > _scheduler; std::shared_ptr< sisl::logging::logger_t > _custom_logger; - nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, - nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics); - nuraft::cb_func::ReturnCode raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, - nuraft::cb_func::Param* param); + void raft_event(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param); void exit_group(group_id_t const& group_id); public: - ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false); + ManagerImpl(Manager::Params const&, std::weak_ptr< MessagingApplication >); ~ManagerImpl() override; int32_t server_id() const override { return _srv_id; } void register_mgr_type(group_type_t const& group_type, group_params const&) override; + nuraft::cmd_result_code group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, + nuraft::context*& ctx, std::shared_ptr< group_metrics > metrics); + std::shared_ptr< mesg_state_mgr > lookup_state_manager(group_id_t const& group_id) const override; NullAsyncResult create_group(group_id_t const& group_id, group_type_t const& group_type) override; @@ -85,6 +86,7 @@ class ManagerImpl : public Manager { void leave_group(group_id_t const& group_id) override; uint32_t logstore_id(group_id_t const& group_id) const override; void append_peers(group_id_t const& group_id, std::list< peer_id_t >&) const override; + void start(bool and_data_svc); void restart_server() override; // data service APIs diff --git a/src/lib/proto/proto_service.cpp b/src/lib/proto/proto_service.cpp index 50ec70a..b3eea25 100644 --- a/src/lib/proto/proto_service.cpp +++ b/src/lib/proto/proto_service.cpp @@ -151,9 +151,9 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs return true; } -std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, group_id_t const& service_address, - bool const enable_data_service) { - return std::make_shared< proto_service >(get_server_ctx, service_address, enable_data_service); +std::shared_ptr< msg_service > msg_service::create(std::shared_ptr< ManagerImpl > const& manager, + group_id_t const& service_address, bool const enable_data_service) { + return std::make_shared< proto_service >(manager, service_address, enable_data_service); } } // namespace nuraft_mesg diff --git a/src/lib/service.cpp b/src/lib/service.cpp index f85c47f..6659256 100644 --- a/src/lib/service.cpp +++ b/src/lib/service.cpp @@ -38,11 +38,9 @@ grpc_server_wrapper::grpc_server_wrapper(group_id_t const& group_id) { if (0 < SISL_OPTIONS.count("msg_metrics")) m_metrics = std::make_shared< group_metrics >(group_id); } -msg_service::msg_service(get_server_ctx_cb get_server_ctx, group_id_t const& service_address, +msg_service::msg_service(std::shared_ptr< ManagerImpl > const& manager, group_id_t const& service_address, bool const enable_data_service) : - _data_service_enabled(enable_data_service), - _get_server_ctx(get_server_ctx), - _service_address(service_address) {} + _data_service_enabled(enable_data_service), _manager(manager), _service_address(service_address) {} msg_service::~msg_service() { std::unique_lock< lock_type > lck(_raft_servers_lock); @@ -179,9 +177,14 @@ nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_i if (g_type.empty()) { g_type = _default_group_type; } std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_id, group_id)); if (_raft_servers.end() != it && happened) { - if (auto err = _get_server_ctx(srv_id, group_id, g_type, ctx, it->second.m_metrics); err) { - LOGE("Error during RAFT server creation [group={}]: {}", group_id, err); - return err; + if (auto mgr = _manager.lock(); !mgr) { + LOGW("Got join after shutdown...skipping [group={}]", group_id); + return nuraft::cmd_result_code::CANCELLED; + } else { + if (auto err = mgr->group_init(srv_id, group_id, g_type, ctx, it->second.m_metrics); err) { + LOGE("Error during RAFT server creation [group={}]: {}", group_id, err); + return err; + } } DEBUG_ASSERT(!ctx->rpc_listener_, "RPC listner should not be set!"); auto new_listner = std::make_shared< msg_group_listner >(shared_from_this(), group_id); diff --git a/src/lib/service.hpp b/src/lib/service.hpp index a948038..dcacb5f 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -33,10 +33,6 @@ class group_metrics : public sisl::MetricsGroupWrapper { ~group_metrics() { deregister_me_from_farm(); } }; -using get_server_ctx_cb = - std::function< nuraft::cmd_result_code(int32_t srv_id, group_id_t const&, group_type_t const&, - nuraft::context*& ctx_out, std::shared_ptr< group_metrics > metrics) >; - // pluggable type for data service using data_service_t = data_service_grpc; @@ -51,18 +47,19 @@ class msg_service : public std::enable_shared_from_this< msg_service >, public n bool _data_service_enabled; data_service_t _data_service; std::string _default_group_type; + std::weak_ptr< ManagerImpl > _manager; protected: - get_server_ctx_cb _get_server_ctx; lock_type _raft_servers_lock; std::map< group_id_t, grpc_server_wrapper > _raft_servers; peer_id_t const _service_address; public: - msg_service(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, bool const enable_data_service); + msg_service(std::shared_ptr< ManagerImpl > const& manager, peer_id_t const& service_address, + bool const enable_data_service); virtual ~msg_service(); - static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, peer_id_t const& service_address, - bool const enable_data_service); + static std::shared_ptr< msg_service > create(std::shared_ptr< ManagerImpl > const& manager, + peer_id_t const& service_address, bool const enable_data_service); msg_service(msg_service const&) = delete; msg_service& operator=(msg_service const&) = delete; @@ -85,11 +82,6 @@ class msg_service : public std::enable_shared_from_this< msg_service >, public n void setDefaultGroupType(std::string const& _type); - nuraft::cmd_result_code createRaftGroup(int const srv_id, group_id_t const& group_id, - group_type_t const& group_type) { - return joinRaftGroup(srv_id, group_id, group_type); - } - nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_id, group_type_t const&); void partRaftGroup(group_id_t const& group_id);