Skip to content

Commit

Permalink
java client support FQDN
Browse files Browse the repository at this point in the history
  • Loading branch information
yujingwei committed Oct 18, 2023
1 parent 525c516 commit 5f5cd80
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 15 deletions.
2 changes: 1 addition & 1 deletion scripts/recompile_thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rm -rf $TMP_DIR
mkdir -p $TMP_DIR
$THIRDPARTY_ROOT/output/bin/thrift --gen cpp:moveable_types -out $TMP_DIR ../idl/rrdb.thrift

sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h
sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/rpc\/rpc_host_port.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h
sed 's/#include "rrdb_types.h"/#include <rrdb\/rrdb_types.h>/' $TMP_DIR/rrdb_types.cpp > ../src/base/rrdb_types.cpp

rm -rf $TMP_DIR
Expand Down
4 changes: 4 additions & 0 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct learn_request
// be duplicated (ie. max_gced_decree < confirmed_decree), if not,
// learnee will copy the missing logs.
7:optional i64 max_gced_decree;
8:optional dsn.host_port hp_learner;
}

struct learn_response
Expand All @@ -156,6 +157,7 @@ struct learn_response
7:dsn.rpc_address address; // learnee's address
8:string base_local_dir; // base dir of files on learnee
9:optional string replica_disk_tag; // the disk tag of learnee located
10:optional dsn.host_port hp_address; // learnee's address
}

struct learn_notify_response
Expand All @@ -180,6 +182,7 @@ struct group_check_request
// Used to deliver child gpid and meta_split_status during partition split
6:optional dsn.gpid child_gpid;
7:optional metadata.split_status meta_split_status;
8:optional dsn.host_port hp_node;
}

struct group_check_response
Expand All @@ -195,5 +198,6 @@ struct group_check_response
// if secondary pause or cancel split succeed, is_split_stopped = true
8:optional bool is_split_stopped;
9:optional metadata.disk_status disk_status = metadata.disk_status.NORMAL;
10:optional dsn.host_port hp_node;
}

27 changes: 16 additions & 11 deletions src/failure_detector/fd.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,28 @@ namespace cpp dsn.fd

struct beacon_msg
{
1: i64 time;
2: dsn.rpc_address from_addr;
3: dsn.rpc_address to_addr;
4: optional i64 start_time;
1: i64 time;
2: dsn.rpc_address from_addr;
3: dsn.rpc_address to_addr;
4: optional i64 start_time;
5: optional dsn.host_port hp_from_addr;
6: optional dsn.host_port hp_to_addr;
}

struct beacon_ack
{
1: i64 time;
2: dsn.rpc_address this_node;
3: dsn.rpc_address primary_node;
4: bool is_master;
5: bool allowed;
1: i64 time;
2: dsn.rpc_address this_node;
3: dsn.rpc_address primary_node;
4: bool is_master;
5: bool allowed;
6: optional dsn.host_port hp_this_node;
7: optional dsn.host_port hp_primary_node;
}

struct config_master_message
{
1: dsn.rpc_address master;
2: bool is_register;
1: dsn.rpc_address master;
2: bool is_register;
3: optional dsn.host_port hp_master;
}
2 changes: 2 additions & 0 deletions src/nfs/nfs.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct copy_request
8: bool overwrite;
9: optional string source_disk_tag;
10: optional dsn.gpid pid;
11: optional dsn.host_port hp_source;
}

struct copy_response
Expand All @@ -60,6 +61,7 @@ struct get_file_size_request
6: optional string source_disk_tag;
7: optional string dest_disk_tag;
8: optional dsn.gpid pid;
9: optional dsn.host_port hp_source;
}

struct get_file_size_response
Expand Down
6 changes: 4 additions & 2 deletions src/runtime/rpc/group_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <vector>

#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -131,7 +130,10 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr)
CHECK_TRUE(add(host_port(addr)));
}
_update_leader_automatically = g_addr->is_update_leader_automatically();
set_leader(host_port(g_addr->leader()));
auto leader_addr = g_addr->leader();
if (rpc_address::s_invalid_address != leader_addr) {
set_leader(host_port(leader_addr));
}
}

inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other)
Expand Down
40 changes: 39 additions & 1 deletion src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/rpc/rpc_host_port.h"
#include "utils/error_code.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_conv.h"
#include "utils/utils.h"

namespace dsn {
Expand Down Expand Up @@ -68,8 +69,35 @@ host_port::host_port(std::string host, uint16_t port)
CHECK_NE_MSG(rpc_address::ipv4_from_host(_host.c_str()), 0, "invalid hostname: {}", _host);
}

bool host_port::from_string(const std::string s)
{
std::string ip_port = s;
auto pos = ip_port.find_last_of(':');
if (pos == std::string::npos) {
return false;
}
std::string host = ip_port.substr(0, pos);
std::string port = ip_port.substr(pos + 1);

// check port
unsigned int port_num;
if (!internal::buf2unsigned(port, port_num) || port_num > UINT16_MAX) {
return false;
}

if (rpc_address::ipv4_from_host(host.c_str()) == 0) {
return false;
}

_type = HOST_TYPE_IPV4;
_host = host;
_port = port_num;
return true;
}

host_port::host_port(rpc_address addr)
{
reset();
switch (addr.type()) {
case HOST_TYPE_IPV4: {
CHECK(utils::hostname_from_ip(htonl(addr.ip()), &_host),
Expand All @@ -79,6 +107,7 @@ host_port::host_port(rpc_address addr)
} break;
case HOST_TYPE_GROUP: {
_group_host_port = new rpc_group_host_port(addr.group_address());
_group_host_port->add_ref();
} break;
default:
break;
Expand Down Expand Up @@ -133,7 +162,7 @@ std::string host_port::to_string() const
case HOST_TYPE_GROUP:
return fmt::format("address group {}", group_host_port()->name());
default:
return "invalid address";
return "invalid host_port";
}
}

Expand Down Expand Up @@ -198,4 +227,13 @@ error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
return error_s::ok();
}

void host_port::fill_host_ports_from_addresses(const std::vector<rpc_address> &addr_v,
std::vector<host_port> &hp_v)
{
CHECK(hp_v.empty(), "optional host_port should be empty!");
for (const auto &addr : addr_v) {
hp_v.emplace_back(host_port(addr));
}
}

} // namespace dsn
49 changes: 49 additions & 0 deletions src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,31 @@ class TProtocol;
} // namespace thrift
} // namespace apache

#define FILL_HP_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME) \
do { \
host_port hp(OBJ_NAME.SECTION_NAME); \
OBJ_NAME.__set_hp_##SECTION_NAME(hp); \
} while (false)

#define FILL_HP_LIST_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME) \
do { \
std::vector<host_port> hps; \
dsn::host_port::fill_host_ports_from_addresses(OBJ_NAME.SECTION_NAME, hps); \
OBJ_NAME.__set_hp_##SECTION_NAME(hps); \
} while (false)

#define FILL_OPTIONAL_HP_IF_NEEDED(OBJ_NAME, SECTION_NAME) \
do { \
if (!OBJ_NAME.__isset.hp_##SECTION_NAME) \
FILL_HP_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME); \
} while (false)

#define FILL_OPTIONAL_HP_LIST_IF_NEEDED(OBJ_NAME, SECTION_NAME) \
do { \
if (!OBJ_NAME.__isset.hp_##SECTION_NAME) \
FILL_HP_LIST_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME); \
} while (false)

namespace dsn {

class rpc_group_host_port;
Expand Down Expand Up @@ -83,10 +108,17 @@ class host_port
// Trere may be multiple rpc_addresses for one host_port.
error_s resolve_addresses(std::vector<rpc_address> &addresses) const;

// This function is used for validating the format of string like "test_host:1234".
bool from_string(const std::string s);

// for serialization in thrift format
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;

static void fill_host_ports_from_addresses(const std::vector<rpc_address> &addr_v,
/*output*/ std::vector<host_port> &hp_v);


private:
std::string _host;
uint16_t _port = 0;
Expand Down Expand Up @@ -116,6 +148,23 @@ inline bool operator==(const host_port &hp1, const host_port &hp2)

inline bool operator!=(const host_port &hp1, const host_port &hp2) { return !(hp1 == hp2); }

inline bool operator<(const host_port &hp1, const host_port &hp2)
{
if (hp1.type() != hp2.type()) {
return hp1.type() < hp2.type();
}

switch (hp1.type()) {
case HOST_TYPE_IPV4:
return hp1.host() < hp2.host() || (hp1.host() == hp2.host() && hp1.port() < hp2.port());
case HOST_TYPE_GROUP:
return reinterpret_cast<uint64_t>(hp1.group_host_port()) <
reinterpret_cast<uint64_t>(hp2.group_host_port());
default:
return true;
}
}

} // namespace dsn

USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port);
Expand Down

0 comments on commit 5f5cd80

Please sign in to comment.