Skip to content

Commit

Permalink
Merge pull request #40 from raakella1/data_api_impl
Browse files Browse the repository at this point in the history
Data api impl
  • Loading branch information
raakella1 authored Oct 23, 2023
2 parents c0b436e + 8f4cff5 commit 8066124
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 90 deletions.
6 changes: 5 additions & 1 deletion src/include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ class mesg_factory final : public grpc_factory {
nuraft::cmd_result_code reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf);
NullAsyncResult data_service_request_unidirectional(std::optional< Result< peer_id_t > > const& dest,
std::string const& request_name, io_blob_list_t const& cli_buf);

AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::optional< Result< peer_id_t > > const&,
std::string const&, io_blob_list_t const&);
};

} // namespace nuraft_mesg
30 changes: 18 additions & 12 deletions src/include/nuraft_mesg/mesg_state_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ namespace nuraft_mesg {
class mesg_factory;
class grpc_server;

// config for a replica with after the int32_t id is transformed to a peer_id_t
struct replica_config {
std::string peer_id;
std::string aux;
};

class repl_service_ctx {
public:
repl_service_ctx(grpc_server* server);
repl_service_ctx(nuraft::raft_server* server);
virtual ~repl_service_ctx() = default;

// we do not own this pointer. Use this only if the lyfe cycle of the pointer is well known
grpc_server* m_server;
// we do not own this pointer. Use this only if the life cycle of the pointer is well known
nuraft::raft_server* _server;
bool is_raft_leader() const;

// return a list of replica configs for the peers of the raft group
void get_cluster_config(std::list< replica_config >& cluster_config) const;

// data service api client calls
virtual NullAsyncResult data_service_request_unidirectional(destination_t to, std::string const& request_name,
virtual NullAsyncResult data_service_request_unidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;
virtual AsyncResult< sisl::io_blob > data_service_request_bidrectional(destination_t to,
std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;

// Will be removed after the above two APIs are implemented
virtual AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;
virtual AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) = 0;

// Send response to a data service request and finish the async call.
virtual void send_data_service_response(io_blob_list_t const& outgoing_buf,
Expand All @@ -49,7 +55,7 @@ class mesg_state_mgr : public nuraft::state_mgr {
public:
using nuraft::state_mgr::state_mgr;
virtual ~mesg_state_mgr() = default;
void make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory >& cli_factory);
void make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory);

virtual void become_ready() {}
virtual uint32_t get_logstore_id() const = 0;
Expand Down
69 changes: 53 additions & 16 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,39 +354,76 @@ bool ManagerImpl::bind_data_service_request(std::string const& request_name, gro
return _mesg_service->bind_data_service_request(request_name, group_id, request_handler);
}

NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(destination_t, std::string const&,
io_blob_list_t const&) {
LOGW("method not implemented");
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
// The endpoint field of the raft_server config is the uuid of the server.
const std::string repl_service_ctx_grpc::id_to_str(int32_t const id) const {
auto const& srv_config = _server->get_config()->get_server(id);
return (srv_config) ? srv_config->get_endpoint() : std::string();
}

AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidrectional(destination_t, std::string const&,
io_blob_list_t const&) {
LOGW("method not implemented");
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
const std::optional< Result< peer_id_t > > repl_service_ctx_grpc::get_peer_id(destination_t const& dest) const {
if (std::holds_alternative< peer_id_t >(dest)) return std::get< peer_id_t >(dest);
if (std::holds_alternative< role_regex >(dest)) {
if (!_server) {
LOGW("server not initialized");
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
}
switch (std::get< role_regex >(dest)) {
case role_regex::LEADER: {
if (is_raft_leader()) return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
auto const leader = _server->get_leader();
if (leader == -1) return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
return boost::uuids::string_generator()(id_to_str(leader));
} break;
case role_regex::ALL: {
return std::nullopt;
} break;
default: {
LOGE("Method not implemented");
return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
} break;
}
}
DEBUG_ASSERT(false, "Unknown destination type");
return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
}

AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf) {
NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) {
return (!m_mesg_factory)
? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND)
: m_mesg_factory->data_service_request_unidirectional(get_peer_id(dest), request_name, cli_buf);
}

return (m_mesg_factory) ? m_mesg_factory->data_service_request(request_name, cli_buf)
: folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) {
return (!m_mesg_factory)
? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND)
: m_mesg_factory->data_service_request_bidirectional(get_peer_id(dest), request_name, cli_buf);
}

repl_service_ctx::repl_service_ctx(grpc_server* server) : m_server(server) {}
repl_service_ctx::repl_service_ctx(nuraft::raft_server* server) : _server(server) {}

bool repl_service_ctx::is_raft_leader() const { return m_server->raft_server()->is_leader(); }
bool repl_service_ctx::is_raft_leader() const { return _server->is_leader(); }

void repl_service_ctx::get_cluster_config(std::list< replica_config >& cluster_config) const {
auto const& srv_configs = _server->get_config()->get_servers();
for (auto const& srv_config : srv_configs) {
cluster_config.emplace_back(replica_config{srv_config->get_endpoint(), srv_config->get_aux()});
}
}

repl_service_ctx_grpc::repl_service_ctx_grpc(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) :
repl_service_ctx(server), m_mesg_factory(cli_factory) {}
repl_service_ctx(server ? server->raft_server().get() : nullptr), m_mesg_factory(cli_factory) {}

void repl_service_ctx_grpc::send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
serialize_to_byte_buffer(rpc_data->response(), outgoing_buf);
rpc_data->send_response();
}

void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory >& cli_factory) {
void mesg_state_mgr::make_repl_ctx(grpc_server* server, std::shared_ptr< mesg_factory > const& cli_factory) {
m_repl_svc_ctx = std::make_unique< repl_service_ctx_grpc >(server, cli_factory);
}

Expand Down
14 changes: 8 additions & 6 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,17 @@ class repl_service_ctx_grpc : public repl_service_ctx {
~repl_service_ctx_grpc() override = default;
std::shared_ptr< mesg_factory > m_mesg_factory;

NullAsyncResult data_service_request_unidirectional(destination_t to, std::string const& request_name,
NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name,
io_blob_list_t const& cli_buf) override;
AsyncResult< sisl::io_blob > data_service_request_bidrectional(destination_t to, std::string const& request_name,
io_blob_list_t const& cli_buf) override;

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf) override;
AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) override;
void send_data_service_response(io_blob_list_t const& outgoing_buf,
boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) override;

private:
const std::optional< Result< peer_id_t > > get_peer_id(destination_t const& dest) const;
const std::string id_to_str(int32_t const id) const;
};

} // namespace nuraft_mesg
104 changes: 80 additions & 24 deletions src/lib/mesg_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,38 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha
NURAFT_MESG_CONFIG(mesg_factory_config->raft_request_deadline_secs));
}

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf) {
NullAsyncResult data_service_request_unidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
grpc::ByteBuffer cli_byte_buf;
serialize_to_byte_buffer(cli_byte_buf, cli_buf);
auto [p, sf] = folly::makePromiseContract< Result< sisl::io_blob > >();
/// FIXME we shouldn't need a copy-constructible here should we?
auto copyable_promise = std::make_shared< decltype(p) >(std::move(p));
_generic_stub->call_unary(
cli_byte_buf, request_name,
[c = copyable_promise](grpc::ByteBuffer& resp, ::grpc::Status& status) mutable {
if (!status.ok()) {
LOGE("Failed to send data_service_request, error: {}", status.error_message());
c->setValue(folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED));
} else {
sisl::io_blob svr_buf;
deserialize_from_byte_buffer(resp, svr_buf);
c->setValue(std::move(svr_buf));
return _generic_stub
->call_unary(cli_byte_buf, request_name,
NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> NullResult {
if (response.hasError()) {
LOGE("Failed to send data_service_request, error: {}", response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
}
},
NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs));
return std::move(sf);
return folly::unit;
});
}

AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
grpc::ByteBuffer cli_byte_buf;
serialize_to_byte_buffer(cli_byte_buf, cli_buf);
return _generic_stub
->call_unary(cli_byte_buf, request_name,
NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs))
.deferValue([](auto&& response) -> Result< sisl::io_blob > {
if (response.hasError()) {
LOGE("Failed to send data_service_request, error: {}", response.error().error_message());
return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED);
}
sisl::io_blob svr_buf;
deserialize_from_byte_buffer(response.value(), svr_buf);
return svr_buf;
});
}

protected:
Expand Down Expand Up @@ -129,8 +141,14 @@ class group_client : public grpc_base_client {
_client->send(group_msg, complete);
}

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf) {
return _client->data_service_request(request_name, cli_buf);
NullAsyncResult data_service_request_unidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
return _client->data_service_request_unidirectional(request_name, cli_buf);
}

AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::string const& request_name,
io_blob_list_t const& cli_buf) {
return _client->data_service_request_bidirectional(request_name, cli_buf);
}
};

Expand All @@ -154,15 +172,53 @@ nuraft::cmd_result_code mesg_factory::reinit_client(peer_id_t const& client,
return nuraft::OK;
}

AsyncResult< sisl::io_blob > mesg_factory::data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf) {
NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional< Result< peer_id_t > > const& dest,
std::string const& request_name,
io_blob_list_t const& cli_buf) {
std::shared_lock< client_factory_lock_type > rl(_client_lock);
auto calls = std::list< AsyncResult< sisl::io_blob > >();
auto calls = std::vector< NullAsyncResult >();
if (dest) {
if (dest->hasError()) return folly::makeUnexpected(dest->error());
if (auto it = _clients.find(dest->value()); _clients.end() != it) {
auto g_client = std::dynamic_pointer_cast< nuraft_mesg::group_client >(it->second);
return g_client->data_service_request_unidirectional(get_generic_method_name(request_name, _group_id),
cli_buf);
} else {
LOGE("Failed to find client for [{}], request name [{}]", dest->value(), request_name);
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
}
}
// else
for (auto& nuraft_client : _clients) {
auto g_client = std::dynamic_pointer_cast< nuraft_mesg::group_client >(nuraft_client.second);
calls.push_back(g_client->data_service_request(get_generic_method_name(request_name, _group_id), cli_buf));
calls.push_back(
g_client->data_service_request_unidirectional(get_generic_method_name(request_name, _group_id), cli_buf));
}
// We ignore the vector of future response from collect all and st the value as folly::unit.
// This is because we do not have a use case to handle the errors that happen during the unidirectional call to all
// the peers.
return folly::collectAll(calls).deferValue([](auto &&) -> NullResult { return folly::unit; });
}

AsyncResult< sisl::io_blob >
mesg_factory::data_service_request_bidirectional(std::optional< Result< peer_id_t > > const& dest,
std::string const& request_name, io_blob_list_t const& cli_buf) {
std::shared_lock< client_factory_lock_type > rl(_client_lock);
auto calls = std::vector< AsyncResult< sisl::io_blob > >();
if (dest) {
if (dest->hasError()) return folly::makeUnexpected(dest->error());
if (auto it = _clients.find(dest->value()); _clients.end() != it) {
auto g_client = std::dynamic_pointer_cast< nuraft_mesg::group_client >(it->second);
return g_client->data_service_request_bidirectional(get_generic_method_name(request_name, _group_id),
cli_buf);
} else {
LOGE("Failed to find client for [{}], request name [{}]", dest->value(), request_name);
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
}
}
return folly::collectAnyWithoutException(calls).deferValue([](auto&& p) { return p.second; });
// else
LOGE("Cannot send request to all the peers, not implemented yet!. Request name [{}]", request_name);
return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
}

group_factory::group_factory(int const cli_thread_count, group_id_t const& name,
Expand Down
Loading

0 comments on commit 8066124

Please sign in to comment.