Skip to content

Commit

Permalink
Extend bidirectional API to use raft server id in the destination_t type
Browse files Browse the repository at this point in the history
  • Loading branch information
Ravi Nagarjun Akella authored and Ravi Nagarjun Akella committed Feb 2, 2024
1 parent 2f9d5d4 commit b031be8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "2.3.2"
version = "2.3.3"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down
4 changes: 2 additions & 2 deletions include/nuraft_mesg/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace nuraft_mesg {
using peer_id_t = boost::uuids::uuid;
using group_id_t = boost::uuids::uuid;
using group_type_t = std::string;

using svr_id_t = int32_t;
using io_blob_list_t = folly::small_vector< sisl::io_blob, 4 >;

template < typename T >
Expand All @@ -33,7 +33,7 @@ using NullResult = Result< folly::Unit >;
using NullAsyncResult = AsyncResult< folly::Unit >;

ENUM(role_regex, uint8_t, LEADER, FOLLOWER, ALL, ANY);
using destination_t = std::variant< peer_id_t, role_regex >;
using destination_t = std::variant< peer_id_t, role_regex, svr_id_t >;

} // namespace nuraft_mesg

Expand Down
17 changes: 13 additions & 4 deletions src/lib/repl_service_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,20 @@ const std::string repl_service_ctx_grpc::id_to_str(int32_t const id) const {

const std::optional< Result< peer_id_t > > repl_service_ctx_grpc::get_peer_id(destination_t const& dest) const {
if (std::holds_alternative< peer_id_t >(dest)) return std::get< peer_id_t >(dest);
if (std::holds_alternative< role_regex >(dest)) {
if (!_server) {
LOGW("server not initialized");
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);

if (!_server) {
LOGW("server not initialized");
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
}

if (std::holds_alternative< svr_id_t >(dest)) {
if (auto const id_str = id_to_str(std::get< svr_id_t >(dest)); !id_str.empty()) {
return boost::uuids::string_generator()(id_str);
}
return folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND);
}

if (std::holds_alternative< role_regex >(dest)) {
switch (std::get< role_regex >(dest)) {
case role_regex::LEADER: {
if (is_raft_leader()) return folly::makeUnexpected(nuraft::cmd_result_code::BAD_REQUEST);
Expand Down
25 changes: 22 additions & 3 deletions src/tests/data_service_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "test_fixture.ipp"
#include <libnuraft/raft_server_handler.hxx>

class DataServiceFixture : public MessagingFixtureBase {
protected:
Expand Down Expand Up @@ -93,13 +94,23 @@ TEST_F(DataServiceFixture, BasicTest1) {
return folly::Unit();
}));

auto repl_ctx1 = sm1->get_repl_context();
for (auto svr : repl_ctx1->_server->get_config()->get_servers()) {
if (svr->get_endpoint() == to_string(app_1_->id_)) continue;
LOGINFO("Sending request to server [{}]", svr->get_id())
results.push_back(sm1->data_service_request_bidirectional(svr->get_id(), REQUEST_DATA, cli_buf)
.deferValue([](auto e) -> NullResult {
EXPECT_TRUE(e.hasValue());
return folly::Unit();
}));
}

folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get();

// add a new member to data_service_test_group and check if repl_ctx4 sends data to newly added member
auto add_3 = app_4->instance_->add_member(data_group, app_3_->id_);
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_TRUE(std::move(add_3).get());
auto sm3 = app_3_->state_mgr_map_[data_group];
sm4->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, SEND_DATA, cli_buf)
.deferValue([](auto e) -> folly::Unit {
EXPECT_TRUE(e.hasValue());
Expand All @@ -108,9 +119,9 @@ TEST_F(DataServiceFixture, BasicTest1) {
.get();

// TODO REVIEW THIS
// test_group: 4 (1 SEND_DATA) + 1 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 6
// test_group: 4 (1 SEND_DATA) + 5 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 10
// data_service_test_group: 1 (1 REQUEST_DATA) + 4 (1 SEND_DATA) = 5
EXPECT_EQ(test_state_mgr::get_server_counter(), 11);
EXPECT_EQ(test_state_mgr::get_server_counter(), 15);
app_5->instance_->leave_group(data_group);
app_5->instance_->leave_group(group_id_);
app_4->instance_->leave_group(data_group);
Expand Down Expand Up @@ -215,6 +226,14 @@ TEST_F(DataServiceFixture, NegativeTests) {
return folly::Unit();
}));

// invalid svr id
results.push_back(
sm1->data_service_request_unidirectional(-1, REQUEST_DATA, cli_buf).deferValue([](auto e) -> NullResult {
EXPECT_TRUE(e.hasError());
EXPECT_EQ(nuraft::cmd_result_code::SERVER_NOT_FOUND, e.error());
return folly::Unit();
}));

// unimplemented methods
results.push_back(sm1->data_service_request_bidirectional(nuraft_mesg::role_regex::ALL, REQUEST_DATA, cli_buf)
.deferValue([](auto e) -> NullResult {
Expand Down

0 comments on commit b031be8

Please sign in to comment.