Skip to content

Commit

Permalink
feat: java client support FQDN
Browse files Browse the repository at this point in the history
  • Loading branch information
yujingwei committed Oct 18, 2023
1 parent 5f5cd80 commit d8c6b25
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 1 deletion.
250 changes: 250 additions & 0 deletions java-client/src/main/java/org/apache/pegasus/base/host_port.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package org.apache.pegasus.base;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;

import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.meta_data.FieldMetaData;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TStruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TODO
*
* @author yujingwei
* @version 1.0.0
* @since 2023/10/09 16:30
*/
public final class host_port
implements TBase<host_port, host_port._Fields>, java.io.Serializable, Cloneable{
private static final TStruct STRUCT_DESC = new TStruct("host_port");

private static final Logger LOG = LoggerFactory.getLogger(host_port.class);

public String hostAddress;

public short port;

/**
* The set of fields this struct contains, along with convenience methods for finding and
* manipulating them.
*/
public enum _Fields implements TFieldIdEnum {
;

private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();

static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}

/** Find the _Fields constant that matches fieldId, or null if its not found. */
public static _Fields findByThriftId(int fieldId) {
switch (fieldId) {
default:
return null;
}
}

/** Find the _Fields constant that matches fieldId, throwing an exception if it is not found. */
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null)
throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}

/** Find the _Fields constant that matches name, or null if its not found. */
public static _Fields findByName(String name) {
return byName.get(name);
}

private final short _thriftId;
private final String _fieldName;

_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}

public short getThriftFieldId() {
return _thriftId;
}

public String getFieldName() {
return _fieldName;
}
}

public static final Map<_Fields, FieldMetaData> metaDataMap;

static {
Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class);
metaDataMap = Collections.unmodifiableMap(tmpMap);
FieldMetaData.addStructMetaDataMap(host_port.class, metaDataMap);
}

public host_port() {
this.hostAddress = null;
this.port = 0;
}


public boolean isInvalid() {
return this.hostAddress == null || this.port == 0 ;
}


public String get_ip() throws UnknownHostException{
return getInetAddress(hostAddress).toString();
}

public String getIpAndPort(){
return get_ip() + get_port();
}

public short get_port() {
return port;
}

public static InetAddress getInetAddress(final String host) {
InetAddress[] addrs = getAllInetAddresses(host);
if (addrs != null && addrs.length > 0) {
return addrs[0];
}
return null;
}

public static InetAddress[] getAllInetAddresses(final String host) {
final long start = System.nanoTime();
try {
InetAddress[] ipAddrs = InetAddress.getAllByName(host);
long latency = System.nanoTime() - start;
if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ipAddrs, latency);
} else if (latency >= 3000000/*ns*/) {
LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ipAddrs, latency);
}
return ipAddrs;
} catch (UnknownHostException e) {
LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
return null;
}
}

/** Performs a deep copy on <i>other</i>. */
public host_port(host_port other) {
this.hostAddress = other.hostAddress;
this.port = other.port;
}

public host_port deepCopy() {
return new host_port(this);
}

@Override
public void clear() {
this.hostAddress = null;
this.port = 0;
}

public void setFieldValue(_Fields field, Object value) {
switch (field) {
}
}

public Object getFieldValue(_Fields field) {
switch (field) {
}
throw new IllegalStateException();
}

/**
* Returns true if field corresponding to fieldID is set (has been asigned a value) and false
* otherwise
*/
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}

switch (field) {
}
throw new IllegalStateException();
}

@Override
public boolean equals(Object that) {
if (that == null) return false;
if (that instanceof host_port) return this.equals((host_port) that);
return false;
}

public boolean equals(host_port that) {
if (that == null) return false;
return this.hostAddress.equals(that.hostAddress) && this.port == that.port;
}

@Override
public int hashCode() {
int result = 17;
result = 31 * result + (hostAddress != null ? hostAddress.hashCode() : 0);
result = 31 * result + port;
return result;
}

public int compareTo(host_port other) {
if(other == null){
return 1;
}

int comparsion = this.hostAddress.compareTo(other.hostAddress);
if(comparsion == 0){
return comparsion;
}

return Integer.compare(this.port, other.port);
}

public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}

public void read(TProtocol iprot) throws TException {
port = iprot.readI16();
hostAddress = iprot.readString();
// check for required fields of primitive type, which can't be checked in the validate method
validate();
}

public void write(TProtocol oprot) throws TException {
validate();
oprot.writeString(hostAddress);
oprot.writeI16(port);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("host_port(");
try {
sb.append(get_ip());
} catch (UnknownHostException e) {
sb.append("invalid_addr");
}
sb.append(":");
sb.append(get_port());
sb.append(")");
return sb.toString();
}

public void validate() throws TException {
// check for required fields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.thrift.meta_data.FieldMetaData;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TStruct;
import org.apache.pegasus.base.host_port;

public final class rpc_address
implements TBase<rpc_address, rpc_address._Fields>, java.io.Serializable, Cloneable {
Expand Down Expand Up @@ -106,6 +107,10 @@ public rpc_address() {
this.address = 0;
}

public rpc_address(host_port hp){
this.address = hp.getIp();
}

public boolean isInvalid() {
return this.address == 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pegasus.base.error_code;
import org.apache.pegasus.base.rpc_address;
import org.apache.pegasus.base.host_port;
import org.apache.pegasus.operator.client_operator;
import org.apache.pegasus.operator.create_app_operator;
import org.apache.pegasus.operator.drop_app_operator;
Expand Down Expand Up @@ -76,6 +77,7 @@ public MetaSession(
this.eachQueryTimeoutInMills = eachQueryTimeoutInMills;
this.defaultMaxQueryCount = defaultMaxQueryCount;
this.group = g;
this.hostPort = null;
}

public static error_code.error_types getMetaServiceError(client_operator metaQueryOp) {
Expand Down Expand Up @@ -106,13 +108,24 @@ public static rpc_address getMetaServiceForwardAddress(client_operator metaQuery
return null;
java.util.List<partition_configuration> partitions = op.get_response().getPartitions();
if (partitions == null || partitions.isEmpty()) return null;
addr = tygGetHostPort(partitions);
addr = partitions.get(0).getPrimary();
if (addr == null || addr.isInvalid()) return null;
}

return addr;
}

public rpc_address tygGetHostPort(java.util.List<partition_configuration> partitions){
hostPort = partitions.get(0).getHp_primary();
if(hostPort != null){
rpc_address addr = new rpc_address(hostPort);
}
else{
return partitions.get(0).getPrimary();
}
}

public final void asyncExecute(client_operator op, Runnable callbackFunc, int maxExecuteCount) {
if (maxExecuteCount == 0) {
maxExecuteCount = defaultMaxQueryCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void initTableConfiguration(query_cfg_response resp) {
continue;
}

replicaConfig.primaryAddress = pc.getPrimary();
replicaConfig.primaryAddress = pc.getHp_primary();
// If the primary address is invalid, we don't create secondary session either.
// Because all of these sessions will be recreated later.
if (replicaConfig.primaryAddress.isInvalid()) {
Expand All @@ -192,6 +192,7 @@ void initTableConfiguration(query_cfg_response resp) {
// backup request is enabled, get all secondary sessions
if (isBackupRequestEnabled()) {
// secondary sessions
pc.hp_secondaries.forEach();
pc.secondaries.forEach(
secondary -> {
ReplicaSession session = tryConnect(secondary, futureGroup);
Expand Down

0 comments on commit d8c6b25

Please sign in to comment.