diff --git a/.gitignore b/.gitignore index 0fb95ff1..981450ec 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,4 @@ pegasus-client.iml log.txt rolling_log/ .vscode/ - +.history/ diff --git a/configuration/pegasus.properties b/configuration/pegasus.properties index 0a6519cf..167ad465 100644 --- a/configuration/pegasus.properties +++ b/configuration/pegasus.properties @@ -4,3 +4,7 @@ async_workers = 4 enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 +open_auth = false +jaas_conf = configuration/pegasus_jaas.conf +service_name = xxx +service_fqdn = xxx diff --git a/configuration/pegasus_jaas.conf b/configuration/pegasus_jaas.conf new file mode 100644 index 00000000..1e2a5214 --- /dev/null +++ b/configuration/pegasus_jaas.conf @@ -0,0 +1,10 @@ +client { + com.sun.security.auth.module.Krb5LoginModule required + debug=true + useTicketCache=true + useKeyTab=true + keyTab="/etc/xxx.keytab" + renewTGT=true + principal="xx@xxx.xxx" + storeKey=true; +}; diff --git a/idl/recompile_thrift.sh b/idl/recompile_thrift.sh index d3dfe5a1..63365f86 100755 --- a/idl/recompile_thrift.sh +++ b/idl/recompile_thrift.sh @@ -23,7 +23,8 @@ rm -rf $TMP_DIR mkdir -p $TMP_DIR $thrift --gen java rrdb.thrift -$thrift --gen java replication.thrift +$thrift --gen java replication.thrift +$thrift --gen java security.thrift # as we pack the thrift source in our project, so we need to replace the package name find $TMP_DIR -name "*.java" | xargs sed -i -e "s/org.apache.thrift/com.xiaomi.infra.pegasus.thrift/g" diff --git a/idl/replication.thrift b/idl/replication.thrift index 9af4436e..29be735e 100644 --- a/idl/replication.thrift +++ b/idl/replication.thrift @@ -21,6 +21,8 @@ struct query_cfg_request 2:list partition_indices; } +// for server version > 1.11.2, if err == ERR_FORWARD_TO_OTHERS, +// then the forward address will be put in partitions[0].primary if exist. struct query_cfg_response { 1:base.error_code err; diff --git a/idl/security.thrift b/idl/security.thrift new file mode 100644 index 00000000..864fc60b --- /dev/null +++ b/idl/security.thrift @@ -0,0 +1,55 @@ +include "base.thrift" + +namespace cpp dsn.apps +namespace java com.xiaomi.infra.pegasus.apps +namespace py pypegasus.rrdb + +// negotiation process: +// +// client server +// | --- SASL_MECH --> | +// | <-- SASL_MECH --- | +// | - SASL_SEL_MECH ->| +// | <- SASL_SEL_OK ---| +// | | +// | --- SASL_INIT --> | +// | | +// | <-- SASL_CHAL --- | +// | --- SASL_RESP --> | +// | | +// | ..... | +// | | +// | <-- SASL_CHAL --- | +// | --- SASL_RESP --> | +// | | (authentication will succeed +// | | if all chanllenges passed) +// | <-- SASL_SUCC --- | +// (client won't response | | +// if servers says ok) | | +// | --- RPC_CALL ---> | +// | <-- RPC_RESP ---- | + +enum negotiation_status +{ + INVALID = 0, + SASL_LIST_MECHANISMS, + SASL_LIST_MECHANISMS_RESP, + SASL_SELECT_MECHANISMS, + SASL_SELECT_MECHANISMS_OK, + SASL_INITIATE, + SASL_CHALLENGE, + SASL_RESPONSE, + SASL_SUCC, + SASL_AUTH_FAIL +} + +struct negotiation_message +{ + 1: negotiation_status status; + 2: base.blob msg; +} + +service security +{ + negotiation_message negotiate(1:negotiation_message nego_msg); +} diff --git a/pom.xml b/pom.xml index 01edfc0e..124e5f3d 100755 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,11 @@ json 20160810 + + com.github.luben + zstd-jni + 1.3.7-1 + diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_request.java index e7bc310e..a1c84126 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class check_and_mutate_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("check_and_mutate_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_response.java index 09f62f34..8346d20b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_mutate_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class check_and_mutate_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("check_and_mutate_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_request.java index bd852e8c..7d902c85 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class check_and_set_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("check_and_set_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_response.java index 5ef1416d..196f9437 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/check_and_set_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class check_and_set_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("check_and_set_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/count_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/count_response.java index 5d189990..6540a764 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/count_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/count_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class count_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("count_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/get_scanner_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/get_scanner_request.java index 905cfe8d..edcaf2e9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/get_scanner_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/get_scanner_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class get_scanner_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("get_scanner_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/incr_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/incr_request.java index 349a4c8a..e757f636 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/incr_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/incr_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class incr_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("incr_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/incr_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/incr_response.java index 54b96219..a9b45fa4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/incr_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/incr_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class incr_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("incr_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/key_value.java b/src/main/java/com/xiaomi/infra/pegasus/apps/key_value.java index ab2ace98..95aa9b1b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/key_value.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/key_value.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class key_value implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("key_value"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/meta.java b/src/main/java/com/xiaomi/infra/pegasus/apps/meta.java index 6ba7e688..bbf5dc9e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/meta.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/meta.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class meta { public interface Iface { diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_request.java index b0d7243f..7c7207a5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class multi_get_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("multi_get_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_response.java index 460b47c3..bd14389a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_get_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class multi_get_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("multi_get_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_put_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_put_request.java index d4b61ecb..300b0ff9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_put_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_put_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class multi_put_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("multi_put_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_request.java index 69ddb489..85ed70e7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class multi_remove_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("multi_remove_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_response.java index 00345657..ad372f9d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/multi_remove_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class multi_remove_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("multi_remove_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/mutate.java b/src/main/java/com/xiaomi/infra/pegasus/apps/mutate.java index 66ac2e03..b12de56f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/mutate.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/mutate.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class mutate implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("mutate"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_message.java b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_message.java new file mode 100644 index 00000000..ddcf86b2 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_message.java @@ -0,0 +1,500 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-19") +public class negotiation_message implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("negotiation_message"); + + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField STATUS_FIELD_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TField("status", com.xiaomi.infra.pegasus.thrift.protocol.TType.I32, (short)1); + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField MSG_FIELD_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TField("msg", com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT, (short)2); + + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiation_messageStandardSchemeFactory(); + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiation_messageTupleSchemeFactory(); + + /** + * + * @see negotiation_status + */ + public negotiation_status status; // required + public com.xiaomi.infra.pegasus.base.blob msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements com.xiaomi.infra.pegasus.thrift.TFieldIdEnum { + /** + * + * @see negotiation_status + */ + STATUS((short)1, "status"), + MSG((short)2, "msg"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // STATUS + return STATUS; + case 2: // MSG + return MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.STATUS, new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData("status", com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.EnumMetaData(com.xiaomi.infra.pegasus.thrift.protocol.TType.ENUM, negotiation_status.class))); + tmpMap.put(_Fields.MSG, new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData("msg", com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.StructMetaData(com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT, com.xiaomi.infra.pegasus.base.blob.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiation_message.class, metaDataMap); + } + + public negotiation_message() { + } + + public negotiation_message( + negotiation_status status, + com.xiaomi.infra.pegasus.base.blob msg) + { + this(); + this.status = status; + this.msg = msg; + } + + /** + * Performs a deep copy on other. + */ + public negotiation_message(negotiation_message other) { + if (other.isSetStatus()) { + this.status = other.status; + } + if (other.isSetMsg()) { + this.msg = new com.xiaomi.infra.pegasus.base.blob(other.msg); + } + } + + public negotiation_message deepCopy() { + return new negotiation_message(this); + } + + @Override + public void clear() { + this.status = null; + this.msg = null; + } + + /** + * + * @see negotiation_status + */ + public negotiation_status getStatus() { + return this.status; + } + + /** + * + * @see negotiation_status + */ + public negotiation_message setStatus(negotiation_status status) { + this.status = status; + return this; + } + + public void unsetStatus() { + this.status = null; + } + + /** Returns true if field status is set (has been assigned a value) and false otherwise */ + public boolean isSetStatus() { + return this.status != null; + } + + public void setStatusIsSet(boolean value) { + if (!value) { + this.status = null; + } + } + + public com.xiaomi.infra.pegasus.base.blob getMsg() { + return this.msg; + } + + public negotiation_message setMsg(com.xiaomi.infra.pegasus.base.blob msg) { + this.msg = msg; + return this; + } + + public void unsetMsg() { + this.msg = null; + } + + /** Returns true if field msg is set (has been assigned a value) and false otherwise */ + public boolean isSetMsg() { + return this.msg != null; + } + + public void setMsgIsSet(boolean value) { + if (!value) { + this.msg = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case STATUS: + if (value == null) { + unsetStatus(); + } else { + setStatus((negotiation_status)value); + } + break; + + case MSG: + if (value == null) { + unsetMsg(); + } else { + setMsg((com.xiaomi.infra.pegasus.base.blob)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case STATUS: + return getStatus(); + + case MSG: + return getMsg(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case STATUS: + return isSetStatus(); + case MSG: + return isSetMsg(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiation_message) + return this.equals((negotiation_message)that); + return false; + } + + public boolean equals(negotiation_message that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_status = true && this.isSetStatus(); + boolean that_present_status = true && that.isSetStatus(); + if (this_present_status || that_present_status) { + if (!(this_present_status && that_present_status)) + return false; + if (!this.status.equals(that.status)) + return false; + } + + boolean this_present_msg = true && this.isSetMsg(); + boolean that_present_msg = true && that.isSetMsg(); + if (this_present_msg || that_present_msg) { + if (!(this_present_msg && that_present_msg)) + return false; + if (!this.msg.equals(that.msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetStatus()) ? 131071 : 524287); + if (isSetStatus()) + hashCode = hashCode * 8191 + status.getValue(); + + hashCode = hashCode * 8191 + ((isSetMsg()) ? 131071 : 524287); + if (isSetMsg()) + hashCode = hashCode * 8191 + msg.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiation_message other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetStatus()).compareTo(other.isSetStatus()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetStatus()) { + lastComparison = com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo(this.status, other.status); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMsg()) { + lastComparison = com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo(this.msg, other.msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot) throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot) throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiation_message("); + boolean first = true; + + sb.append("status:"); + if (this.status == null) { + sb.append("null"); + } else { + sb.append(this.status); + } + first = false; + if (!first) sb.append(", "); + sb.append("msg:"); + if (this.msg == null) { + sb.append("null"); + } else { + sb.append(this.msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws com.xiaomi.infra.pegasus.thrift.TException { + // check for required fields + // check for sub-struct validity + if (msg != null) { + msg.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol(new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(out))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol(new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(in))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiation_messageStandardSchemeFactory implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public negotiation_messageStandardScheme getScheme() { + return new negotiation_messageStandardScheme(); + } + } + + private static class negotiation_messageStandardScheme extends com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme { + + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot, negotiation_message struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // STATUS + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.I32) { + struct.status = com.xiaomi.infra.pegasus.apps.negotiation_status.findByValue(iprot.readI32()); + struct.setStatusIsSet(true); + } else { + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // MSG + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT) { + struct.msg = new com.xiaomi.infra.pegasus.base.blob(); + struct.msg.read(iprot); + struct.setMsgIsSet(true); + } else { + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, negotiation_message struct) throws com.xiaomi.infra.pegasus.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.status != null) { + oprot.writeFieldBegin(STATUS_FIELD_DESC); + oprot.writeI32(struct.status.getValue()); + oprot.writeFieldEnd(); + } + if (struct.msg != null) { + oprot.writeFieldBegin(MSG_FIELD_DESC); + struct.msg.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiation_messageTupleSchemeFactory implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public negotiation_messageTupleScheme getScheme() { + return new negotiation_messageTupleScheme(); + } + } + + private static class negotiation_messageTupleScheme extends com.xiaomi.infra.pegasus.thrift.scheme.TupleScheme { + + @Override + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, negotiation_message struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol oprot = (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetStatus()) { + optionals.set(0); + } + if (struct.isSetMsg()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetStatus()) { + oprot.writeI32(struct.status.getValue()); + } + if (struct.isSetMsg()) { + struct.msg.write(oprot); + } + } + + @Override + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, negotiation_message struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol iprot = (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.status = com.xiaomi.infra.pegasus.apps.negotiation_status.findByValue(iprot.readI32()); + struct.setStatusIsSet(true); + } + if (incoming.get(1)) { + struct.msg = new com.xiaomi.infra.pegasus.base.blob(); + struct.msg.read(iprot); + struct.setMsgIsSet(true); + } + } + } + + private static S scheme(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol proto) { + return (com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } +} + diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java new file mode 100644 index 00000000..306061e6 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/negotiation_status.java @@ -0,0 +1,68 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + + +public enum negotiation_status implements com.xiaomi.infra.pegasus.thrift.TEnum { + INVALID(0), + SASL_LIST_MECHANISMS(1), + SASL_LIST_MECHANISMS_RESP(2), + SASL_SELECT_MECHANISMS(3), + SASL_SELECT_MECHANISMS_OK(4), + SASL_INITIATE(5), + SASL_CHALLENGE(6), + SASL_RESPONSE(7), + SASL_SUCC(8), + SASL_AUTH_FAIL(9); + + private final int value; + + private negotiation_status(int value) { + this.value = value; + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } + + /** + * Find a the enum type by its integer value, as defined in the Thrift IDL. + * @return null if the value is not found. + */ + public static negotiation_status findByValue(int value) { + switch (value) { + case 0: + return INVALID; + case 1: + return SASL_LIST_MECHANISMS; + case 2: + return SASL_LIST_MECHANISMS_RESP; + case 3: + return SASL_SELECT_MECHANISMS; + case 4: + return SASL_SELECT_MECHANISMS_OK; + case 5: + return SASL_INITIATE; + case 6: + return SASL_CHALLENGE; + case 7: + return SASL_RESPONSE; + case 8: + return SASL_SUCC; + case 9: + return SASL_AUTH_FAIL; + default: + return null; + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/read_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/read_response.java index 4dbfd9b6..2dcbddd3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/read_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/read_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class read_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("read_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/rrdb.java b/src/main/java/com/xiaomi/infra/pegasus/apps/rrdb.java index 487002ec..4efcf8d4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/rrdb.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/rrdb.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class rrdb { public interface Iface { diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/scan_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/scan_request.java index f7e184b2..56f58f3a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/scan_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/scan_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class scan_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("scan_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/scan_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/scan_response.java index f60ae939..b99a7eec 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/scan_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/scan_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class scan_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("scan_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/security.java b/src/main/java/com/xiaomi/infra/pegasus/apps/security.java new file mode 100644 index 00000000..1bc48e15 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/security.java @@ -0,0 +1,977 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package com.xiaomi.infra.pegasus.apps; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-19") +public class security { + + public interface Iface { + + public negotiation_message negotiate(negotiation_message nego_msg) throws com.xiaomi.infra.pegasus.thrift.TException; + + } + + public interface AsyncIface { + + public void negotiate(negotiation_message nego_msg, com.xiaomi.infra.pegasus.thrift.async.AsyncMethodCallback resultHandler) throws com.xiaomi.infra.pegasus.thrift.TException; + + } + + public static class Client extends com.xiaomi.infra.pegasus.thrift.TServiceClient implements Iface { + public static class Factory implements com.xiaomi.infra.pegasus.thrift.TServiceClientFactory { + public Factory() {} + public Client getClient(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot) { + return new Client(prot); + } + public Client getClient(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot, com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot) { + return new Client(iprot, oprot); + } + } + + public Client(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot) + { + super(prot, prot); + } + + public Client(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot, com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot) { + super(iprot, oprot); + } + + public negotiation_message negotiate(negotiation_message nego_msg) throws com.xiaomi.infra.pegasus.thrift.TException + { + send_negotiate(nego_msg); + return recv_negotiate(); + } + + public void send_negotiate(negotiation_message nego_msg) throws com.xiaomi.infra.pegasus.thrift.TException + { + negotiate_args args = new negotiate_args(); + args.setNego_msg(nego_msg); + sendBase("negotiate", args); + } + + public negotiation_message recv_negotiate() throws com.xiaomi.infra.pegasus.thrift.TException + { + negotiate_result result = new negotiate_result(); + receiveBase(result, "negotiate"); + if (result.isSetSuccess()) { + return result.success; + } + throw new com.xiaomi.infra.pegasus.thrift.TApplicationException(com.xiaomi.infra.pegasus.thrift.TApplicationException.MISSING_RESULT, "negotiate failed: unknown result"); + } + + } + public static class AsyncClient extends com.xiaomi.infra.pegasus.thrift.async.TAsyncClient implements AsyncIface { + public static class Factory implements com.xiaomi.infra.pegasus.thrift.async.TAsyncClientFactory { + private com.xiaomi.infra.pegasus.thrift.async.TAsyncClientManager clientManager; + private com.xiaomi.infra.pegasus.thrift.protocol.TProtocolFactory protocolFactory; + public Factory(com.xiaomi.infra.pegasus.thrift.async.TAsyncClientManager clientManager, com.xiaomi.infra.pegasus.thrift.protocol.TProtocolFactory protocolFactory) { + this.clientManager = clientManager; + this.protocolFactory = protocolFactory; + } + public AsyncClient getAsyncClient(com.xiaomi.infra.pegasus.thrift.transport.TNonblockingTransport transport) { + return new AsyncClient(protocolFactory, clientManager, transport); + } + } + + public AsyncClient(com.xiaomi.infra.pegasus.thrift.protocol.TProtocolFactory protocolFactory, com.xiaomi.infra.pegasus.thrift.async.TAsyncClientManager clientManager, com.xiaomi.infra.pegasus.thrift.transport.TNonblockingTransport transport) { + super(protocolFactory, clientManager, transport); + } + + public void negotiate(negotiation_message nego_msg, com.xiaomi.infra.pegasus.thrift.async.AsyncMethodCallback resultHandler) throws com.xiaomi.infra.pegasus.thrift.TException { + checkReady(); + negotiate_call method_call = new negotiate_call(nego_msg, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class negotiate_call extends com.xiaomi.infra.pegasus.thrift.async.TAsyncMethodCall { + private negotiation_message nego_msg; + public negotiate_call(negotiation_message nego_msg, com.xiaomi.infra.pegasus.thrift.async.AsyncMethodCallback resultHandler, com.xiaomi.infra.pegasus.thrift.async.TAsyncClient client, com.xiaomi.infra.pegasus.thrift.protocol.TProtocolFactory protocolFactory, com.xiaomi.infra.pegasus.thrift.transport.TNonblockingTransport transport) throws com.xiaomi.infra.pegasus.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.nego_msg = nego_msg; + } + + public void write_args(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot) throws com.xiaomi.infra.pegasus.thrift.TException { + prot.writeMessageBegin(new com.xiaomi.infra.pegasus.thrift.protocol.TMessage("negotiate", com.xiaomi.infra.pegasus.thrift.protocol.TMessageType.CALL, 0)); + negotiate_args args = new negotiate_args(); + args.setNego_msg(nego_msg); + args.write(prot); + prot.writeMessageEnd(); + } + + public negotiation_message getResult() throws com.xiaomi.infra.pegasus.thrift.TException { + if (getState() != com.xiaomi.infra.pegasus.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new java.lang.IllegalStateException("Method call not finished!"); + } + com.xiaomi.infra.pegasus.thrift.transport.TMemoryInputTransport memoryTransport = new com.xiaomi.infra.pegasus.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_negotiate(); + } + } + + } + + public static class Processor extends com.xiaomi.infra.pegasus.thrift.TBaseProcessor implements com.xiaomi.infra.pegasus.thrift.TProcessor { + private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName()); + public Processor(I iface) { + super(iface, getProcessMap(new java.util.HashMap>())); + } + + protected Processor(I iface, java.util.Map> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static java.util.Map> getProcessMap(java.util.Map> processMap) { + processMap.put("negotiate", new negotiate()); + return processMap; + } + + public static class negotiate extends com.xiaomi.infra.pegasus.thrift.ProcessFunction { + public negotiate() { + super("negotiate"); + } + + public negotiate_args getEmptyArgsInstance() { + return new negotiate_args(); + } + + protected boolean isOneway() { + return false; + } + + @Override + protected boolean handleRuntimeExceptions() { + return false; + } + + public negotiate_result getResult(I iface, negotiate_args args) throws com.xiaomi.infra.pegasus.thrift.TException { + negotiate_result result = new negotiate_result(); + result.success = iface.negotiate(args.nego_msg); + return result; + } + } + + } + + public static class AsyncProcessor extends com.xiaomi.infra.pegasus.thrift.TBaseAsyncProcessor { + private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName()); + public AsyncProcessor(I iface) { + super(iface, getProcessMap(new java.util.HashMap>())); + } + + protected AsyncProcessor(I iface, java.util.Map> processMap) { + super(iface, getProcessMap(processMap)); + } + + private static java.util.Map> getProcessMap(java.util.Map> processMap) { + processMap.put("negotiate", new negotiate()); + return processMap; + } + + public static class negotiate extends com.xiaomi.infra.pegasus.thrift.AsyncProcessFunction { + public negotiate() { + super("negotiate"); + } + + public negotiate_args getEmptyArgsInstance() { + return new negotiate_args(); + } + + public com.xiaomi.infra.pegasus.thrift.async.AsyncMethodCallback getResultHandler(final com.xiaomi.infra.pegasus.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { + final com.xiaomi.infra.pegasus.thrift.AsyncProcessFunction fcall = this; + return new com.xiaomi.infra.pegasus.thrift.async.AsyncMethodCallback() { + public void onComplete(negotiation_message o) { + negotiate_result result = new negotiate_result(); + result.success = o; + try { + fcall.sendResponse(fb, result, com.xiaomi.infra.pegasus.thrift.protocol.TMessageType.REPLY,seqid); + } catch (com.xiaomi.infra.pegasus.thrift.transport.TTransportException e) { + _LOGGER.error("TTransportException writing to internal frame buffer", e); + fb.close(); + } catch (java.lang.Exception e) { + _LOGGER.error("Exception writing to internal frame buffer", e); + onError(e); + } + } + public void onError(java.lang.Exception e) { + byte msgType = com.xiaomi.infra.pegasus.thrift.protocol.TMessageType.REPLY; + com.xiaomi.infra.pegasus.thrift.TSerializable msg; + negotiate_result result = new negotiate_result(); + if (e instanceof com.xiaomi.infra.pegasus.thrift.transport.TTransportException) { + _LOGGER.error("TTransportException inside handler", e); + fb.close(); + return; + } else if (e instanceof com.xiaomi.infra.pegasus.thrift.TApplicationException) { + _LOGGER.error("TApplicationException inside handler", e); + msgType = com.xiaomi.infra.pegasus.thrift.protocol.TMessageType.EXCEPTION; + msg = (com.xiaomi.infra.pegasus.thrift.TApplicationException)e; + } else { + _LOGGER.error("Exception inside handler", e); + msgType = com.xiaomi.infra.pegasus.thrift.protocol.TMessageType.EXCEPTION; + msg = new com.xiaomi.infra.pegasus.thrift.TApplicationException(com.xiaomi.infra.pegasus.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + } catch (java.lang.Exception ex) { + _LOGGER.error("Exception writing to internal frame buffer", ex); + fb.close(); + } + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, negotiate_args args, com.xiaomi.infra.pegasus.thrift.async.AsyncMethodCallback resultHandler) throws com.xiaomi.infra.pegasus.thrift.TException { + iface.negotiate(args.nego_msg,resultHandler); + } + } + + } + + public static class negotiate_args implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("negotiate_args"); + + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField NEGO_MSG_FIELD_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TField("nego_msg", com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT, (short)1); + + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiate_argsStandardSchemeFactory(); + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiate_argsTupleSchemeFactory(); + + public negotiation_message nego_msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements com.xiaomi.infra.pegasus.thrift.TFieldIdEnum { + NEGO_MSG((short)1, "nego_msg"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // NEGO_MSG + return NEGO_MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.NEGO_MSG, new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData("nego_msg", com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.StructMetaData(com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT, negotiation_message.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiate_args.class, metaDataMap); + } + + public negotiate_args() { + } + + public negotiate_args( + negotiation_message nego_msg) + { + this(); + this.nego_msg = nego_msg; + } + + /** + * Performs a deep copy on other. + */ + public negotiate_args(negotiate_args other) { + if (other.isSetNego_msg()) { + this.nego_msg = new negotiation_message(other.nego_msg); + } + } + + public negotiate_args deepCopy() { + return new negotiate_args(this); + } + + @Override + public void clear() { + this.nego_msg = null; + } + + public negotiation_message getNego_msg() { + return this.nego_msg; + } + + public negotiate_args setNego_msg(negotiation_message nego_msg) { + this.nego_msg = nego_msg; + return this; + } + + public void unsetNego_msg() { + this.nego_msg = null; + } + + /** Returns true if field nego_msg is set (has been assigned a value) and false otherwise */ + public boolean isSetNego_msg() { + return this.nego_msg != null; + } + + public void setNego_msgIsSet(boolean value) { + if (!value) { + this.nego_msg = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case NEGO_MSG: + if (value == null) { + unsetNego_msg(); + } else { + setNego_msg((negotiation_message)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case NEGO_MSG: + return getNego_msg(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case NEGO_MSG: + return isSetNego_msg(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiate_args) + return this.equals((negotiate_args)that); + return false; + } + + public boolean equals(negotiate_args that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_nego_msg = true && this.isSetNego_msg(); + boolean that_present_nego_msg = true && that.isSetNego_msg(); + if (this_present_nego_msg || that_present_nego_msg) { + if (!(this_present_nego_msg && that_present_nego_msg)) + return false; + if (!this.nego_msg.equals(that.nego_msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetNego_msg()) ? 131071 : 524287); + if (isSetNego_msg()) + hashCode = hashCode * 8191 + nego_msg.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiate_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetNego_msg()).compareTo(other.isSetNego_msg()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetNego_msg()) { + lastComparison = com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo(this.nego_msg, other.nego_msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot) throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot) throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiate_args("); + boolean first = true; + + sb.append("nego_msg:"); + if (this.nego_msg == null) { + sb.append("null"); + } else { + sb.append(this.nego_msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws com.xiaomi.infra.pegasus.thrift.TException { + // check for required fields + // check for sub-struct validity + if (nego_msg != null) { + nego_msg.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol(new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(out))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol(new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(in))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiate_argsStandardSchemeFactory implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public negotiate_argsStandardScheme getScheme() { + return new negotiate_argsStandardScheme(); + } + } + + private static class negotiate_argsStandardScheme extends com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme { + + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot, negotiate_args struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // NEGO_MSG + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT) { + struct.nego_msg = new negotiation_message(); + struct.nego_msg.read(iprot); + struct.setNego_msgIsSet(true); + } else { + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, negotiate_args struct) throws com.xiaomi.infra.pegasus.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.nego_msg != null) { + oprot.writeFieldBegin(NEGO_MSG_FIELD_DESC); + struct.nego_msg.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiate_argsTupleSchemeFactory implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public negotiate_argsTupleScheme getScheme() { + return new negotiate_argsTupleScheme(); + } + } + + private static class negotiate_argsTupleScheme extends com.xiaomi.infra.pegasus.thrift.scheme.TupleScheme { + + @Override + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, negotiate_args struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol oprot = (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetNego_msg()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetNego_msg()) { + struct.nego_msg.write(oprot); + } + } + + @Override + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, negotiate_args struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol iprot = (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.nego_msg = new negotiation_message(); + struct.nego_msg.read(iprot); + struct.setNego_msgIsSet(true); + } + } + } + + private static S scheme(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol proto) { + return (com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + + public static class negotiate_result implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("negotiate_result"); + + private static final com.xiaomi.infra.pegasus.thrift.protocol.TField SUCCESS_FIELD_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TField("success", com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT, (short)0); + + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new negotiate_resultStandardSchemeFactory(); + private static final com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new negotiate_resultTupleSchemeFactory(); + + public negotiation_message success; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements com.xiaomi.infra.pegasus.thrift.TFieldIdEnum { + SUCCESS((short)0, "success"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData("success", com.xiaomi.infra.pegasus.thrift.TFieldRequirementType.DEFAULT, + new com.xiaomi.infra.pegasus.thrift.meta_data.StructMetaData(com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT, negotiation_message.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData.addStructMetaDataMap(negotiate_result.class, metaDataMap); + } + + public negotiate_result() { + } + + public negotiate_result( + negotiation_message success) + { + this(); + this.success = success; + } + + /** + * Performs a deep copy on other. + */ + public negotiate_result(negotiate_result other) { + if (other.isSetSuccess()) { + this.success = new negotiation_message(other.success); + } + } + + public negotiate_result deepCopy() { + return new negotiate_result(this); + } + + @Override + public void clear() { + this.success = null; + } + + public negotiation_message getSuccess() { + return this.success; + } + + public negotiate_result setSuccess(negotiation_message success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((negotiation_message)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return isSetSuccess(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof negotiate_result) + return this.equals((negotiate_result)that); + return false; + } + + public boolean equals(negotiate_result that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetSuccess()) ? 131071 : 524287); + if (isSetSuccess()) + hashCode = hashCode * 8191 + success.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(negotiate_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { + lastComparison = com.xiaomi.infra.pegasus.thrift.TBaseHelper.compareTo(this.success, other.success); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot) throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot) throws com.xiaomi.infra.pegasus.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("negotiate_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws com.xiaomi.infra.pegasus.thrift.TException { + // check for required fields + // check for sub-struct validity + if (success != null) { + success.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol(new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(out))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new com.xiaomi.infra.pegasus.thrift.protocol.TCompactProtocol(new com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport(in))); + } catch (com.xiaomi.infra.pegasus.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class negotiate_resultStandardSchemeFactory implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public negotiate_resultStandardScheme getScheme() { + return new negotiate_resultStandardScheme(); + } + } + + private static class negotiate_resultStandardScheme extends com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme { + + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot, negotiate_result struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 0: // SUCCESS + if (schemeField.type == com.xiaomi.infra.pegasus.thrift.protocol.TType.STRUCT) { + struct.success = new negotiation_message(); + struct.success.read(iprot); + struct.setSuccessIsSet(true); + } else { + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + com.xiaomi.infra.pegasus.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, negotiate_result struct) throws com.xiaomi.infra.pegasus.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.success != null) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + struct.success.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class negotiate_resultTupleSchemeFactory implements com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory { + public negotiate_resultTupleScheme getScheme() { + return new negotiate_resultTupleScheme(); + } + } + + private static class negotiate_resultTupleScheme extends com.xiaomi.infra.pegasus.thrift.scheme.TupleScheme { + + @Override + public void write(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, negotiate_result struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol oprot = (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetSuccess()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetSuccess()) { + struct.success.write(oprot); + } + } + + @Override + public void read(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol prot, negotiate_result struct) throws com.xiaomi.infra.pegasus.thrift.TException { + com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol iprot = (com.xiaomi.infra.pegasus.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.success = new negotiation_message(); + struct.success.read(iprot); + struct.setSuccessIsSet(true); + } + } + } + + private static S scheme(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol proto) { + return (com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/ttl_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/ttl_response.java index 8c2e3364..1c694be1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/ttl_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/ttl_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class ttl_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("ttl_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/update_request.java b/src/main/java/com/xiaomi/infra/pegasus/apps/update_request.java index ffa121cf..f87f84ad 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/update_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/update_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class update_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("update_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/apps/update_response.java b/src/main/java/com/xiaomi/infra/pegasus/apps/update_response.java index d5909717..b19438fd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/apps/update_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/apps/update_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.apps; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class update_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("update_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java b/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java index 684b3516..880efc48 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java +++ b/src/main/java/com/xiaomi/infra/pegasus/base/error_code.java @@ -108,6 +108,10 @@ public enum error_types { ERR_MOCK_INTERNAL, ERR_ZOOKEEPER_OPERATION, + ERR_AUTH_NEGO_FAILED, + + ERR_UNAUTHENTICATED, + ERR_ACL_DENY, //ERROR_CODE defined by client ERR_SESSION_RESET, }; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java index c0d3dce7..b9dd8f4c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusCli.java @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.tools.ZstdWrapper; import org.apache.commons.lang3.tuple.Pair; import java.util.ArrayList; @@ -15,6 +16,11 @@ * This class provides a client tool to access pegasus data. */ public class PegasusCli { + public enum CompressionType { + none, + zstd, + } + public static void usage() { System.out.println(); System.out.println("USAGE: PegasusCli ..."); @@ -35,6 +41,8 @@ public static void usage() { System.out.println(" - incr [increment]"); System.out.println(" - scan [start_sort_key] [stop_sort_key] [max_count]"); System.out.println(" - scan_all [max_count]"); + System.out.println(" - copy_data " + + "[read_uncompress_type(none|zstd)] [write_compress_type(none|zstd)] [max_count]"); System.out.println(); System.out.println(" For example:"); System.out.println(" PegasusCli file://./pegasus.properties temp get hash_key sort_key"); @@ -62,6 +70,10 @@ public static void main(String args[]) { byte[] startSortKey = null; byte[] stopSortKey = null; int maxCount = Integer.MAX_VALUE; + String targetClusterConfigPath = null; + String targetTableName = null; + CompressionType readUncompressType = CompressionType.none; + CompressionType writeCompressType = CompressionType.none; if (opName.equals("get") || opName.equals("del")) { if (args.length != 2) { System.out.println("ERROR: invalid parameter count"); @@ -164,6 +176,24 @@ else if (opName.equals("scan_all")) { maxCount = Integer.parseInt(args[0]); } } + else if (opName.equals("copy_data")) { + if (args.length < 2) { + System.out.println("ERROR: invalid parameter count"); + usage(); + return; + } + targetClusterConfigPath = args[0]; + targetTableName = args[1]; + if (args.length > 2) { + readUncompressType = CompressionType.valueOf(args[2]); + } + if (args.length > 3) { + writeCompressType = CompressionType.valueOf(args[3]); + } + if (args.length > 4) { + maxCount = Integer.parseInt(args[4]); + } + } else { System.out.println("ERROR: invalid op-name: " + opName); usage(); @@ -235,6 +265,7 @@ else if (opName.equals("scan")) { new String(p.getKey().getValue()), new String(p.getValue())); count++; } + scanner.close(); if (count > 0) System.out.println(); System.out.printf("%d key-value pairs got.\n", count); @@ -250,10 +281,48 @@ else if (opName.equals("scan_all")) { new String(p.getKey().getValue()), new String(p.getValue())); count++; } + scanner.close(); } if (count > 0) System.out.println(); System.out.printf("%d key-value pairs got.\n", count); + } else if (opName.equals("copy_data")) { + PegasusClientInterface targetClient = PegasusClientFactory.createClient(targetClusterConfigPath); + try { + PegasusTableInterface targetTable = targetClient.openTable(targetTableName); + List scanners = client.getUnorderedScanners(appName, 1, new ScanOptions()); + int count = 0; + if (scanners.size() > 0) { + PegasusScannerInterface scanner = scanners.get(0); + Pair, byte[]> p; + while (count < maxCount && (p = scanner.next()) != null) { + byte[] newValue = p.getValue(); + switch (readUncompressType) { + case none: + break; + case zstd: + newValue = ZstdWrapper.decompress(newValue); + break; + } + switch (writeCompressType) { + case none: + break; + case zstd: + newValue = ZstdWrapper.compress(newValue); + break; + } + targetTable.set(p.getKey().getKey(), p.getKey().getValue(), newValue, 0); + count++; + if (count % 10000 == 0) { + System.out.printf("Copied %d key-value pairs.\n", count); + } + } + scanner.close(); + } + System.out.printf("Done, copied %d key-value pairs.\n", count); + } finally { + targetClient.close(); + } } } catch (PException e) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index fc8cb627..9e3f578d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -20,7 +20,7 @@ /** * @author qinzuoyan - * + * * Implementation of {@link PegasusClientInterface}. */ public class PegasusClient implements PegasusClientInterface { @@ -62,12 +62,16 @@ private PegasusTable getTable(String tableName) throws PException { } // pegasus client configuration keys - public static final String[] PEGASUS_CLIENT_CONFIG_KEYS = new String[] { + public static final String[] PEGASUS_CLIENT_CONFIG_KEYS = new String[]{ Cluster.PEGASUS_META_SERVERS_KEY, Cluster.PEGASUS_OPERATION_TIMEOUT_KEY, Cluster.PEGASUS_ASYNC_WORKERS_KEY, Cluster.PEGASUS_ENABLE_PERF_COUNTER_KEY, - Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY + Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY, + Cluster.PEGASUS_OPEN_AUTH_KEY, + Cluster.PEGASUS_SERVICE_NAME_KEY, + Cluster.PEGASUS_SERVICE_FQDN_KEY, + Cluster.PEGASUS_JAAS_CONF_KEY }; // configPath could be: @@ -94,12 +98,12 @@ public void finalize() { // generate rocksdb key. public static byte[] generateKey(byte[] hashKey, byte[] sortKey) { int hashKeyLen = (hashKey == null ? 0 : hashKey.length); - Validate.isTrue(hashKeyLen < 0xFFFF, - "length of hash key must be less than UINT16_MAX"); + Validate.isTrue(hashKeyLen < 0xFFFF, + "length of hash key must be less than UINT16_MAX"); int sortKeyLen = (sortKey == null ? 0 : sortKey.length); // default byte order of ByteBuffer is BIG_ENDIAN ByteBuffer buf = ByteBuffer.allocate(2 + hashKeyLen + sortKeyLen); - buf.putShort((short)hashKeyLen); + buf.putShort((short) hashKeyLen); if (hashKeyLen > 0) { buf.put(hashKey); } @@ -113,9 +117,9 @@ public static byte[] generateKey(byte[] hashKey, byte[] sortKey) { public static byte[] generateNextBytes(byte[] hashKey) { int hashKeyLen = hashKey == null ? 0 : hashKey.length; Validate.isTrue(hashKeyLen < 0xFFFF, - "length of hash key must be less than UINT16_MAX"); + "length of hash key must be less than UINT16_MAX"); ByteBuffer buf = ByteBuffer.allocate(2 + hashKeyLen); - buf.putShort((short)hashKeyLen); + buf.putShort((short) hashKeyLen); if (hashKeyLen > 0) { buf.put(hashKey); } @@ -144,18 +148,18 @@ public static byte[] generateNextBytes(byte[] hashKey, byte[] sortKey) { } return Arrays.copyOf(array, i + 1); } - + public static Pair restoreKey(byte[] key) { Validate.isTrue(key != null && key.length >= 2); ByteBuffer buf = ByteBuffer.wrap(key); int hashKeyLen = 0xFFFF & buf.getShort(); Validate.isTrue(hashKeyLen != 0xFFFF && (2 + hashKeyLen <= key.length)); return new ImmutablePair( - Arrays.copyOfRange(key, 2, 2 + hashKeyLen), - Arrays.copyOfRange(key, 2 + hashKeyLen, key.length) + Arrays.copyOfRange(key, 2, 2 + hashKeyLen), + Arrays.copyOfRange(key, 2 + hashKeyLen, key.length) ); } - + public static int bytesCompare(byte[] left, byte[] right) { int len = Math.min(left.length, right.length); for (int i = 0; i < len; i++) { @@ -298,7 +302,7 @@ public boolean multiGetSortKeys(String tableName, byte[] hashKey, int maxFetchCo } PegasusTable table = getTable(tableName); PegasusTableInterface.MultiGetSortKeysResult result = table.multiGetSortKeys(hashKey, maxFetchCount, maxFetchSize, 0); - for (byte[] key: result.keys) { + for (byte[] key : result.keys) { sortKeys.add(key); } return result.allFetched; @@ -428,9 +432,9 @@ public PegasusTableInterface.CheckAndSetResult checkAndSet(String tableName, byt @Override public PegasusTableInterface.CheckAndMutateResult checkAndMutate(String tableName, byte[] hashKey, byte[] checkSortKey, - CheckType checkType, byte[] checkOperand, - Mutations mutations, - CheckAndMutateOptions options) throws PException { + CheckType checkType, byte[] checkOperand, + Mutations mutations, + CheckAndMutateOptions options) throws PException { PegasusTable tb = getTable(tableName); return tb.checkAndMutate(hashKey, checkSortKey, checkType, checkOperand, mutations, options, 0); } @@ -451,7 +455,7 @@ public int ttl(String tableName, byte[] hashKey, byte[] sortKey) throws PExcepti @Override public PegasusScannerInterface getScanner(String tableName, byte[] hashKey, - byte[] startSortKey, byte[] stopSortKey, ScanOptions options) + byte[] startSortKey, byte[] stopSortKey, ScanOptions options) throws PException { PegasusTable tb = getTable(tableName); return tb.getScanner(hashKey, startSortKey, stopSortKey, options); @@ -459,7 +463,7 @@ public PegasusScannerInterface getScanner(String tableName, byte[] hashKey, @Override public List getUnorderedScanners(String tableName, - int maxSplitCount, ScanOptions options) throws PException { + int maxSplitCount, ScanOptions options) throws PException { PegasusTable tb = getTable(tableName); return tb.getUnorderedScanners(maxSplitCount, options); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index bd2dc033..4fd2c435 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -19,9 +19,10 @@ public client_operator(gpid gpid, String tableName) { this.rpc_error = new error_code(); } - public final byte[] prepare_thrift_header(int body_length) { + public final byte[] prepare_thrift_header(int body_length, int client_timeout) { header.body_length = body_length; header.header_length = ThriftHeader.HEADER_LENGTH; + header.client_timeout = client_timeout; header.thread_hash = Tools.dsn_gpid_to_thread_hash(header.app_id, header.partition_index); header.partition_hash = 0; return header.toByteArray(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java new file mode 100644 index 00000000..2f50123c --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/negotiate_operator.java @@ -0,0 +1,49 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.operator; + +import com.xiaomi.infra.pegasus.apps.negotiation_message; +import com.xiaomi.infra.pegasus.apps.security; +import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.thrift.TException; +import com.xiaomi.infra.pegasus.thrift.protocol.TMessage; +import com.xiaomi.infra.pegasus.thrift.protocol.TMessageType; + +public class negotiate_operator extends client_operator { + public negotiate_operator(negotiation_message request) { + super(new gpid(), ""); + this.request = request; + } + + public String name() { + return "negotiate"; + } + + public void send_data(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, int seqid) + throws TException { + TMessage msg = new TMessage("RPC_NEGOTIATION", TMessageType.CALL, seqid); + oprot.writeMessageBegin(msg); + security.negotiate_args get_args = new security.negotiate_args(request); + get_args.write(oprot); + oprot.writeMessageEnd(); + } + + public void recv_data(com.xiaomi.infra.pegasus.thrift.protocol.TProtocol iprot) + throws TException { + security.negotiate_result result = new security.negotiate_result(); + result.read(iprot); + if (result.isSetSuccess()) resp = result.success; + else + throw new com.xiaomi.infra.pegasus.thrift.TApplicationException( + com.xiaomi.infra.pegasus.thrift.TApplicationException.MISSING_RESULT, + "get failed: unknown result"); + } + + public negotiation_message get_response() { + return resp; + } + + private negotiation_message request; + private negotiation_message resp; +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/replication/partition_configuration.java b/src/main/java/com/xiaomi/infra/pegasus/replication/partition_configuration.java index 3f6f639e..80f0880b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/replication/partition_configuration.java +++ b/src/main/java/com/xiaomi/infra/pegasus/replication/partition_configuration.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.replication; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-19") public class partition_configuration implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("partition_configuration"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_request.java b/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_request.java index d4a37ecc..7242c88c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_request.java +++ b/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_request.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.replication; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class query_cfg_request implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("query_cfg_request"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_response.java b/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_response.java index 0fd60429..4b3f7716 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_response.java +++ b/src/main/java/com/xiaomi/infra/pegasus/replication/query_cfg_response.java @@ -10,7 +10,7 @@ package com.xiaomi.infra.pegasus.replication; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2018-09-11") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)", date = "2019-01-13") public class query_cfg_response implements com.xiaomi.infra.pegasus.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final com.xiaomi.infra.pegasus.thrift.protocol.TStruct STRUCT_DESC = new com.xiaomi.infra.pegasus.thrift.protocol.TStruct("query_cfg_response"); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index a290b0e6..a0c14e33 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -5,6 +5,7 @@ import com.xiaomi.infra.pegasus.thrift.TException; import com.xiaomi.infra.pegasus.rpc.async.ClusterManager; + import java.util.Properties; public abstract class Cluster { @@ -28,6 +29,18 @@ public abstract class Cluster { public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60"; + public static final String PEGASUS_SERVICE_NAME_KEY = "service_name"; + public static final String PEGASUS_SERVICE_NAME_DEF = "test-server"; + + public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn"; + public static final String PEGASUS_SERVICE_FQDN_DEF = "myhost"; + + public static final String PEGASUS_OPEN_AUTH_KEY = "open_auth"; + public static final String PEGASUS_OPEN_AUTH_DEF = "false"; + + public static final String PEGASUS_JAAS_CONF_KEY = "jaas_conf"; + public static final String PEGASUS_JAAS_CONF_DEF = "configuration/pegasus_jaas.conf"; + public static Cluster createCluster(Properties config) throws IllegalArgumentException { int operatorTimeout = Integer.parseInt(config.getProperty( PEGASUS_OPERATION_TIMEOUT_KEY, PEGASUS_OPERATION_TIMEOUT_DEF)); @@ -43,24 +56,37 @@ public static Cluster createCluster(Properties config) throws IllegalArgumentExc int asyncWorkers = Integer.parseInt(config.getProperty( PEGASUS_ASYNC_WORKERS_KEY, PEGASUS_ASYNC_WORKERS_DEF)); + ClusterManager.Builder builder = new ClusterManager.Builder(operatorTimeout, asyncWorkers, address); + boolean enablePerfCounter = Boolean.parseBoolean(config.getProperty( PEGASUS_ENABLE_PERF_COUNTER_KEY, PEGASUS_ENABLE_PERF_COUNTER_VALUE)); - String perfCounterTags = enablePerfCounter ? config.getProperty( - PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF) : null; - int pushIntervalSecs = Integer.parseInt(config.getProperty( - PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, - PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF - )); - return new ClusterManager( - operatorTimeout, - asyncWorkers, - enablePerfCounter, - perfCounterTags, - pushIntervalSecs, - address); + if (enablePerfCounter) { + String perfCounterTags = config.getProperty( + PEGASUS_PERF_COUNTER_TAGS_KEY, PEGASUS_PERF_COUNTER_TAGS_DEF); + int pushIntervalSecs = Integer.parseInt(config.getProperty( + PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, + PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF + )); + builder.enableCounter(perfCounterTags, pushIntervalSecs); + } + + boolean needAuth = Boolean.parseBoolean(config.getProperty(PEGASUS_OPEN_AUTH_KEY, PEGASUS_OPEN_AUTH_DEF)); + + if (needAuth) { + String serviceName = config.getProperty(PEGASUS_SERVICE_NAME_KEY, PEGASUS_SERVICE_NAME_DEF); + String serviceFqdn = config.getProperty(PEGASUS_SERVICE_FQDN_KEY, PEGASUS_SERVICE_FQDN_DEF); + String jaasConfPath = config.getProperty(PEGASUS_JAAS_CONF_KEY, PEGASUS_JAAS_CONF_DEF); + if (jaasConfPath != null) { + System.setProperty("java.security.auth.login.config", jaasConfPath); + } + builder.openAuth(serviceName, serviceFqdn); + } + return builder.build(); } public abstract String[] getMetaList(); + public abstract Table openTable(String name, KeyHasher function) throws ReplicationException, TException; + public abstract void close(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 1585c4ed..96a3ecaf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.rpc.async; +import com.sun.security.auth.callback.TextCallbackHandler; import com.xiaomi.infra.pegasus.metrics.MetricsManager; import com.xiaomi.infra.pegasus.rpc.Cluster; import com.xiaomi.infra.pegasus.rpc.KeyHasher; @@ -15,6 +16,9 @@ import io.netty.util.concurrent.Future; import org.slf4j.Logger; +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +33,12 @@ public class ClusterManager extends Cluster { private int retryDelay; private boolean enableCounter; + private boolean openAuth; + private String serviceName; + private String serviceFqdn; + private Subject subject; + private LoginContext loginContext = null; + private ConcurrentHashMap replicaSessions; private EventLoopGroup metaGroup; // group used for handle meta logic private EventLoopGroup replicaGroup; // group used for handle io with replica servers @@ -38,34 +48,100 @@ public class ClusterManager extends Cluster { private static final String osName; private static final String Linux = "Linux"; + static { Properties p = System.getProperties(); osName = p.getProperty("os.name"); logger.info("operating system name: {}", osName); } - public ClusterManager( - int timeout, - int io_threads, - boolean enableCounter, - String perfCounterTags, - int pushIntervalSecs, - String[] address_list) throws IllegalArgumentException { - setTimeout(timeout); - this.enableCounter = enableCounter; + public static class Builder { + private int timeout; + private int io_threads; + private boolean enableCounter; + private String perfCounterTags; + private int pushIntervalSecs; + private String[] address_list; + private boolean openAuth; + private String serviceName; + private String serviceFqdn; + + public Builder(int timeout, int io_threads, String[] address_list + ) { + this.timeout = timeout; + this.io_threads = io_threads; + this.address_list = address_list; + } + + public Builder enableCounter(String perfCounterTags, int pushIntervalSecs) { + this.enableCounter = true; + this.perfCounterTags = perfCounterTags; + this.pushIntervalSecs = pushIntervalSecs; + return this; + } + + public Builder openAuth(String serviceName, String serviceFqdn) { + this.openAuth = true; + this.serviceName = serviceName; + this.serviceFqdn = serviceFqdn; + return this; + } + + public ClusterManager build() { + return new ClusterManager(this); + } + } + + public ClusterManager(Builder builder) { + setTimeout(builder.timeout); + this.enableCounter = builder.enableCounter; if (enableCounter) { - MetricsManager.detectHostAndInit(perfCounterTags, pushIntervalSecs); + MetricsManager.detectHostAndInit(builder.perfCounterTags, builder.pushIntervalSecs); + } + + if (builder.openAuth) { + this.openAuth = true; + this.serviceName = builder.serviceName; + this.serviceFqdn = builder.serviceFqdn; + + String jaasConf = System.getProperties().getProperty("java.security.auth.login.config"); + if (jaasConf == null) { + System.setProperty("java.security.auth.login.config", "configuration/pegasus_jaas.conf"); + jaasConf = System.getProperties().getProperty("java.security.auth.login.config"); + } + logger.info("open authentication, jaas config path: {}, login now", jaasConf); + + try { + loginContext = new LoginContext( + "client", + new TextCallbackHandler()); + } catch (LoginException le) { + logger.error("cannot create LoginContext. LoginException: {}", le.getMessage()); + System.exit(-1); + } catch (SecurityException se) { + logger.error("cannot create LoginContext. SecurityException: {}", se.getMessage()); + System.exit(-1); + } + try { + loginContext.login(); + } catch (LoginException le) { + logger.error("authentication failed: {}", le.getMessage()); + System.exit(-1); + } + + subject = loginContext.getSubject(); + logger.info("login succeed, as user {}", subject.getPrincipals().toString()); } replicaSessions = new ConcurrentHashMap(); - replicaGroup = getEventLoopGroupInstance(io_threads); + replicaGroup = getEventLoopGroupInstance(builder.io_threads); metaGroup = getEventLoopGroupInstance(1); tableGroup = getEventLoopGroupInstance(1); - metaList = address_list; + metaList = builder.address_list; // the constructor of meta session is depend on the replicaSessions, // so the replicaSessions should be initialized earlier - metaSession = new MetaSession(this, address_list, timeout, 10, metaGroup); + metaSession = new MetaSession(this, builder.address_list, builder.timeout, 10, metaGroup); } public EventExecutor getExecutor(String name, int threadCount) { @@ -87,19 +163,43 @@ public ReplicaSession getReplicaSession(rpc_address address) { ss = replicaSessions.get(address); if (ss != null) return ss; - ss = new ReplicaSession(address, replicaGroup, Cluster.SOCK_TIMEOUT); + if (openAuth()) { + logger.info("Create a replicaSession {} which is open-auth", address); + ss = new ReplicaSession( + address, + replicaGroup, + Cluster.SOCK_TIMEOUT, + true, + subject, + serviceName, + serviceFqdn); + } else { + ss = new ReplicaSession(address, replicaGroup, Cluster.SOCK_TIMEOUT); + } replicaSessions.put(address, ss); return ss; } } - public int getTimeout() { return operationTimeout; } + public int getTimeout() { + return operationTimeout; + } + + public long getRetryDelay(long timeoutMs) { + return (timeoutMs < 3 ? 1 : timeoutMs / 3); + } - public long getRetryDelay(long timeoutMs) { return (timeoutMs < 3 ? 1: timeoutMs/3); } + public int getRetryDelay() { + return retryDelay; + } - public int getRetryDelay() { return retryDelay; } + public boolean counterEnabled() { + return enableCounter; + } - public boolean counterEnabled() { return enableCounter; } + public boolean openAuth() { + return openAuth; + } public void setTimeout(int t) { operationTimeout = t; @@ -118,7 +218,9 @@ public static Class getSocketChannelClass() { } @Override - public String[] getMetaList() { return metaList; } + public String[] getMetaList() { + return metaList; + } @Override public TableHandler openTable(String name, KeyHasher h) throws ReplicationException { @@ -132,7 +234,7 @@ public void close() { } metaSession.closeSession(); - for (Map.Entry entry: replicaSessions.entrySet()) { + for (Map.Entry entry : replicaSessions.entrySet()) { entry.getValue().closeSession(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java index c0d5e65a..ec51ec73 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java @@ -7,6 +7,7 @@ import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.operator.client_operator; import com.xiaomi.infra.pegasus.operator.query_cfg_operator; +import com.xiaomi.infra.pegasus.replication.partition_configuration; import io.netty.channel.EventLoopGroup; import java.util.ArrayList; @@ -22,19 +23,19 @@ public class MetaSession { public MetaSession(ClusterManager manager, String addrList[], int eachQueryTimeoutInMills, int defaultMaxQueryCount, EventLoopGroup g) throws IllegalArgumentException { + clusterManager = manager; metaList = new ArrayList(); for (String addr: addrList) { rpc_address rpc_addr = new rpc_address(); if (rpc_addr.fromString(addr)) { logger.info("add {} as meta server", addr); - metaList.add(manager.getReplicaSession(rpc_addr)); - } - else { + metaList.add(clusterManager.getReplicaSession(rpc_addr)); + } else { logger.error("invalid address {}", addr); } } if (metaList.isEmpty()) { - throw new IllegalArgumentException("can't find valid meta server address " + addrList.toString()); + throw new IllegalArgumentException("no valid meta server address"); } curLeader = 0; @@ -50,6 +51,21 @@ static public final error_types getMetaServiceError(client_operator metaQueryOp) return op.get_response().getErr().errno; } + static public final rpc_address getMetaServiceForwardAddress(client_operator metaQueryOp) { + if (metaQueryOp.rpc_error.errno != error_types.ERR_OK) + return null; + query_cfg_operator op = (query_cfg_operator) metaQueryOp; + if (op.get_response().getErr().errno != error_types.ERR_FORWARD_TO_OTHERS) + return null; + java.util.List partitions = op.get_response().getPartitions(); + if (partitions == null || partitions.isEmpty()) + return null; + rpc_address addr = partitions.get(0).getPrimary(); + if (addr == null || addr.isInvalid()) + return null; + return addr; + } + public final void asyncQuery(client_operator op, Runnable callbackFunc, int maxQueryCount) { if (maxQueryCount == 0) { maxQueryCount = defaultMaxQueryCount; @@ -102,12 +118,9 @@ private final void onFinishQueryMeta(final MetaRequestRound round) { boolean needDelay = false; boolean needSwitchLeader = false; + rpc_address forwardAddress = null; --round.maxQueryCount; - if (round.maxQueryCount == 0) { - round.callbackFunc.run(); - return; - } error_types metaError = error_types.ERR_UNKNOWN; if (op.rpc_error.errno == error_types.ERR_OK) { @@ -119,38 +132,59 @@ private final void onFinishQueryMeta(final MetaRequestRound round) { else if (metaError == error_types.ERR_FORWARD_TO_OTHERS) { needDelay = false; needSwitchLeader = true; - } - else { + forwardAddress = getMetaServiceForwardAddress(op); + } else { round.callbackFunc.run(); return; } } else if (op.rpc_error.errno == error_types.ERR_SESSION_RESET || op.rpc_error.errno == error_types.ERR_TIMEOUT) { - needDelay = true; + needDelay = false; needSwitchLeader = true; - } - else { - logger.error("unknown error: {}", op.rpc_error.errno.toString()); + } else { + logger.error(op.rpc_error.errno == error_types.ERR_UNAUTHENTICATED ? "{}" : "unknown error: {}", op.rpc_error.errno.toString()); round.callbackFunc.run(); return; } - logger.info("query meta got error, rpc({}), meta({}), connected leader({}), remain retry count({}), " + - "need switch leader({}), need delay({})", + logger.info("query meta got error, rpc error({}), meta error({}), forward address({}), current leader({}), " + + "remain retry count({}), need switch leader({}), need delay({})", op.rpc_error.errno.toString(), metaError.toString(), + forwardAddress, round.lastSession.name(), round.maxQueryCount, needSwitchLeader, needDelay ); synchronized (this) { - if (needSwitchLeader && metaList.get(curLeader) == round.lastSession) { - curLeader = (curLeader + 1) % metaList.size(); + if (needSwitchLeader) { + if (forwardAddress != null && !forwardAddress.isInvalid()) { + boolean found = false; + for (int i = 0; i < metaList.size(); i++) { + if (metaList.get(i).getAddress().equals(forwardAddress)) { + curLeader = i; + found = true; + break; + } + } + if (!found) { + logger.info("add forward address {} as meta server", forwardAddress); + metaList.add(clusterManager.getReplicaSession(forwardAddress)); + curLeader = metaList.size() - 1; + } + } else if (metaList.get(curLeader) == round.lastSession) { + curLeader = (curLeader + 1) % metaList.size(); + } } round.lastSession = metaList.get(curLeader); } + if (round.maxQueryCount == 0) { + round.callbackFunc.run(); + return; + } + group.schedule(new Runnable() { @Override public void run() { @@ -173,6 +207,7 @@ public MetaRequestRound(client_operator o, Runnable r, int q, ReplicaSession l) } } + private ClusterManager clusterManager; private List metaList; private int curLeader; private int eachQueryTimeoutInMills; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 8b1d328a..66d14931 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -3,10 +3,15 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.rpc.async; +import com.xiaomi.infra.pegasus.apps.negotiation_message; +import com.xiaomi.infra.pegasus.apps.negotiation_status; +import com.xiaomi.infra.pegasus.base.blob; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.operator.client_operator; +import com.xiaomi.infra.pegasus.operator.negotiate_operator; +import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.thrift.protocol.TMessage; import io.netty.channel.*; import io.netty.bootstrap.Bootstrap; @@ -14,16 +19,20 @@ import org.slf4j.Logger; +import javax.security.auth.Subject; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.login.LoginContext; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; import java.net.UnknownHostException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.security.PrivilegedExceptionAction; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Created by weijiesun on 17-9-13. @@ -34,15 +43,29 @@ public static class RequestEntry { public com.xiaomi.infra.pegasus.operator.client_operator op; public Runnable callback; public ScheduledFuture timeoutTask; + public long timeoutMs; } public enum ConnState { CONNECTED, CONNECTING, + NEGOTIATION, DISCONNECTED } public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout) { + this(address, rpcGroup, socketTimeout, false, null, null, null); + } + + // You can specify a message response filter with constructor or with "setMessageResponseFilter" function. + // the mainly usage of filter is test, in which you can control whether to abaondon a response + // and how to abandon it, so as to emulate some network failure cases + public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, MessageResponseFilter filter) { + this(address, rpcGroup, socketTimeout, false, null, null, null); + this.filter = filter; + } + + public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean openAuth, Subject subject, String serviceName, String serviceFqdn) { this.address = address; this.rpcGroup = rpcGroup; @@ -61,15 +84,26 @@ public void initChannel(SocketChannel ch) { pipeline.addLast("ClientHandler", new ReplicaSession.DefaultHandler()); } }); + + this.openAuth = openAuth; + if (openAuth) { + this.subject = subject; + this.serviceName = serviceName; + this.serviceFqdn = serviceFqdn; + // QOP(Quality of Protection) mismatch between client and server may cause the error - No common protection layer between client and server + props.put(Sasl.QOP, "auth"); + } else { + this.subject = new Subject(); + } + + this.firstRecentTimedOutMs = new AtomicLong(0); } - // You can specify a message response filter with constructor or with "setMessageResponseFilter" function. - // the mainly usage of filter is test, in which you can control whether to abaondon a response - // and how to abandon it, so as to emulate some network failure cases - public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, MessageResponseFilter filter) { - this(address, rpcGroup, socketTimeout); + public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTimeout, boolean openAuth, Subject subject, String serviceName, String serviceFqdn, MessageResponseFilter filter) { + this(address, rpcGroup, socketTimeout, openAuth, subject, serviceName, serviceFqdn); this.filter = filter; } + public void setMessageResponseFilter(MessageResponseFilter filter) { this.filter = filter; } @@ -83,6 +117,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi //the timer task is scheduled. pendingResponse.put(new Integer(entry.sequenceId), entry); entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds); + entry.timeoutMs = timeoutInMilliseconds; // We store the connection_state & netty channel in a struct so that they can fetch and update in atomic. // Moreover, we can avoid the lock protection when we want to get the netty channel for send message @@ -130,9 +165,13 @@ public RequestEntry getAndRemoveEntry(int seqID) { return pendingResponse.remove(new Integer(seqID)); } - public final String name() { return address.toString(); } + public final String name() { + return address.toString(); + } - public final rpc_address getAddress() { return address; } + public final rpc_address getAddress() { + return address; + } private void doConnect() { try { @@ -174,7 +213,22 @@ private void markSessionConnected(Channel activeChannel) { } } - private void markSessionDisconnect() { + private void markSessionNegotiation(Channel activeChannel) { + logger.info("{}: mark session state negotiation"); + VolatileFields newCache = new VolatileFields(); + newCache.state = ConnState.NEGOTIATION; + newCache.nettyChannel = activeChannel; + fields = newCache; + if (needAuthConnection()) { + // version + auth, now only support auth + startNegotiation(); + } else { + logger.info("{}: mark session state connected"); + markSessionConnected(activeChannel); + } + } + + private void markSessionDisconnect(error_types errorType) { VolatileFields cache = fields; synchronized (pendingSend) { if (cache.state != ConnState.DISCONNECTED) { @@ -186,14 +240,14 @@ private void markSessionDisconnect() { // this. In this case, we are relying on the timeout task. while (!pendingSend.isEmpty()) { RequestEntry e = pendingSend.poll(); - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyWithSequenceID(e.sequenceId, errorType, false); } List l = new LinkedList(); for (Map.Entry entry : pendingResponse.entrySet()) { l.add(entry.getValue()); } for (RequestEntry e : l) { - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyWithSequenceID(e.sequenceId, errorType, false); } cache = new VolatileFields(); @@ -206,10 +260,17 @@ private void markSessionDisconnect() { } } - private void tryNotifyWithSequenceID( - int seqID, - error_types errno, - boolean isTimeoutTask) { + // for netty event, reflect status of the session + private void markSessionDisconnect() { + markSessionDisconnect(error_types.ERR_SESSION_RESET); + } + + // for handling logic, initiative to disconnect the session. However, the only thing to do is 'markSessionDisconnect' + private void disconnectCurrentSession(error_types errorType) { + markSessionDisconnect(errorType); + } + + private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug("{}: {} is notified with error {}, isTimeoutTask {}", name(), seqID, errno.toString(), isTimeoutTask); RequestEntry entry = pendingResponse.remove(new Integer(seqID)); @@ -218,8 +279,25 @@ private void tryNotifyWithSequenceID( entry.timeoutTask.cancel(true); entry.op.rpc_error.errno = errno; entry.callback.run(); - } - else { + + if (errno == error_types.ERR_TIMEOUT) { + long firstTs = firstRecentTimedOutMs.get(); + if (firstTs == 0) { + // it is the first timeout in the window. + firstRecentTimedOutMs.set(System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstTs >= sessionResetTimeWindowMs) { + // ensure that closeSession() will be invoked only once. + if (firstRecentTimedOutMs.compareAndSet(firstTs, 0)) { + logger.warn("{}: actively close the session because it's not responding for {} seconds", + name(), + sessionResetTimeWindowMs / 1000); + closeSession(); + } + } + } else { + firstRecentTimedOutMs.set(0); + } + } else { logger.warn("{}: {} is removed by others, current error {}, isTimeoutTask {}", name(), seqID, errno.toString(), isTimeoutTask); } @@ -262,7 +340,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Channel {} for session {} is active", ctx.channel().toString(), name()); - markSessionConnected(ctx.channel()); + markSessionNegotiation(ctx.channel()); } @Override @@ -285,13 +363,131 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } + // security + private boolean needAuthConnection() { + return openAuth; + } + + private void startNegotiation() { + logger.info("{}: start auth negotiation", name()); + negotiation_message msg = new negotiation_message(); + msg.status = negotiation_status.SASL_LIST_MECHANISMS; + sendNegoMsg(msg); + } + + private void sendNegoMsg(negotiation_message msg) { + final RequestEntry entry = new ReplicaSession.RequestEntry(); + entry.sequenceId = seqId.getAndIncrement(); + entry.op = new negotiate_operator(msg); + entry.callback = new SaslRecvHandler((negotiate_operator) entry.op); + entry.timeoutTask = addTimer(entry.sequenceId, 1000); + pendingResponse.put(new Integer(entry.sequenceId), entry); + + write(entry, fields); + } + + private class Action implements PrivilegedExceptionAction { + @Override + public Object run() throws Exception { + return null; + } + } + + private class SaslRecvHandler implements Runnable { + negotiate_operator op; + + SaslRecvHandler(negotiate_operator op) { + this.op = op; + } + + @Override + public void run() { + try { + if (op.rpc_error.errno != error_types.ERR_OK) throw new ReplicationException(op.rpc_error.errno); + handleResp(); + } catch (Exception e) { + logger.error(e.toString()); + disconnectCurrentSession(error_types.ERR_AUTH_NEGO_FAILED); + } + } + + private void handleResp() throws Exception { + final negotiation_message resp = op.get_response(); + + if (resp == null) { + logger.error("SaslRecvHandler received a null response, abandon it"); + return; + } + + final negotiation_message msg = new negotiation_message(); + switch (resp.status) { + case INVALID: + case SASL_AUTH_FAIL: + throw new Exception("Received a response which status is " + resp.status + ", break off this negotiation"); + case SASL_LIST_MECHANISMS_RESP: + Subject.doAs( + subject, + new Action() { + public Object run() throws Exception { + String[] mechanisms = new String[expectedMechanisms.size()]; + expectedMechanisms.toArray(mechanisms); + saslClient = + Sasl.createSaslClient( + mechanisms, null, serviceName, serviceFqdn, props, cbh); + logger.info("Select mechanism: {}", saslClient.getMechanismName()); + msg.status = negotiation_status.SASL_SELECT_MECHANISMS; + msg.msg = new blob(saslClient.getMechanismName().getBytes()); + return null; + } + }); + break; + case SASL_SELECT_MECHANISMS_OK: + Subject.doAs( + subject, + new Action() { + public Object run() throws Exception { + msg.status = negotiation_status.SASL_INITIATE; + if (saslClient.hasInitialResponse()) { + msg.msg = new blob(saslClient.evaluateChallenge(new byte[0])); + } else { + msg.msg = new blob(new byte[0]); + } + return null; + } + }); + + break; + case SASL_CHALLENGE: + Subject.doAs( + subject, + new Action() { + public Object run() throws Exception { + msg.status = negotiation_status.SASL_RESPONSE; + msg.msg = new blob(saslClient.evaluateChallenge(resp.msg.data)); + return null; + } + }); + break; + case SASL_SUCC: + markSessionConnected(fields.nettyChannel); + return; + default: + throw new Exception("Received an unknown response, status " + resp.status); + } + + sendNegoMsg(msg); + } + } + // for test ConnState getState() { return fields.state; } + interface MessageResponseFilter { - public boolean abandonIt(error_types err, TMessage header); + boolean abandonIt(error_types err, TMessage header); } + MessageResponseFilter filter = null; final private ConcurrentHashMap pendingResponse = new ConcurrentHashMap(); @@ -303,11 +499,31 @@ private final static class VolatileFields { public ConnState state = ConnState.DISCONNECTED; public Channel nettyChannel = null; } + private volatile VolatileFields fields = new VolatileFields(); - private rpc_address address; + private final rpc_address address; private Bootstrap boot; private EventLoopGroup rpcGroup; + // Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs` + // are timed out, in that case we suspect that the server is unavailable. + + // Timestamp of the first timed out rpc. + private AtomicLong firstRecentTimedOutMs; + private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s + + // security + private boolean openAuth; + private String serviceName; // used for SASL authentication + private String serviceFqdn; // name used for SASL authentication + private CallbackHandler cbh = null; // Don't need handler for GSSAPI + private SaslClient saslClient; + private final HashMap props = new HashMap(); + private final Subject subject; + // TODO: read expected mechanisms from config file + private static final List expectedMechanisms = + new ArrayList(Collections.singletonList("GSSAPI")); + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index d4b06f2e..8aa67d77 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -15,6 +15,7 @@ import com.xiaomi.infra.pegasus.replication.query_cfg_response; import com.xiaomi.infra.pegasus.rpc.Table; import io.netty.util.concurrent.*; +import org.apache.commons.lang3.StringEscapeUtils; import org.slf4j.Logger; import java.util.ArrayList; @@ -47,7 +48,21 @@ final static class TableConfiguration { long lastQueryTime_; public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws ReplicationException { - logger.debug("initialize table handler, table_name({})", name); + int i = 0; + for (; i < name.length(); i++) { + char c = name.charAt(i); + if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') + || c == '_' || c == '.' || c == ':') + continue; + else + break; + } + if (name.length() > 0 && i == name.length()) { + logger.info("initialize table handler, table name is \"{}\"", StringEscapeUtils.escapeJava(name)); + } else { + logger.warn("initialize table handler, maybe invalid table name \"{}\"", StringEscapeUtils.escapeJava(name)); + } + query_cfg_request req = new query_cfg_request(name, new ArrayList()); query_cfg_operator op = new query_cfg_operator(new gpid(-1, -1), req); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameDecoder.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameDecoder.java index 5c3191dc..8bc89cf8 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameDecoder.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameDecoder.java @@ -46,7 +46,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t ec.read(iprot); TMessage msgHeader = iprot.readMessageBegin(); if (session.filter != null && session.filter.abandonIt(ec.errno, msgHeader)) { - logger.info("{}: abaondon a message, err({}), header({})", + logger.info("{}: abandon a message, err({}), header({})", ctx.channel().toString(), ec.errno.toString(), msgHeader.toString()); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java index 800ffcc5..97c55d49 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java @@ -21,18 +21,19 @@ public ThriftFrameEncoder() { @Override protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ReplicaSession.RequestEntry entry, boolean preferDirect) throws Exception { - return preferDirect?ctx.alloc().ioBuffer(256):ctx.alloc().heapBuffer(256); + return preferDirect ? ctx.alloc().ioBuffer(256) : ctx.alloc().heapBuffer(256); } @Override - protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, ByteBuf out) throws Exception{ + protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, ByteBuf out) throws Exception { int initIndex = out.writerIndex(); // write the Memory buffer out.writerIndex(initIndex + ThriftHeader.HEADER_LENGTH); TBinaryProtocol protocol = new TBinaryProtocol(new TByteBufTransport(out)); e.op.send_data(protocol, e.sequenceId); - out.setBytes(initIndex, e.op.prepare_thrift_header(out.readableBytes() - ThriftHeader.HEADER_LENGTH)); + out.setBytes(initIndex, e.op.prepare_thrift_header( + out.readableBytes() - ThriftHeader.HEADER_LENGTH, (int) e.timeoutMs)); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java new file mode 100644 index 00000000..4d23ee40 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java @@ -0,0 +1,70 @@ +package com.xiaomi.infra.pegasus.tools; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStream; +import com.xiaomi.infra.pegasus.client.PException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * ZstdWrapper wraps the compress/decompress APIs of ZStd algorithm. + */ +public class ZstdWrapper { + + private ZstdWrapper() { + } + + /** + * compress the `src` and return the compressed. + * throws: RuntimeException if compression failed. + */ + public static byte[] compress(byte[] src) { + return Zstd.compress(src); + } + + /** + * decompress the `src` and return the original. + * throws: + * - IllegalArgumentException if given `src` is null or empty + * - PException if decompression failed, maybe your `src` is corrupted. + */ + public static byte[] decompress(byte[] src) throws PException { + if (src == null || src.length == 0) { + throw new IllegalArgumentException("src is empty"); + } + + byte[] ret; + long originalSize = Zstd.decompressedSize(src); + if (originalSize > 0) { + ret = new byte[(int) originalSize]; + long code = Zstd.decompress(ret, src); + if (Zstd.isError(code)) { + throw new PException("decompression failed: " + Zstd.getErrorName(code)); + } + if (code != originalSize) { + throw new PException("decompression failed"); + } + return ret; + } + + // fallback to decompress in streaming mode + byte[] inBuf = new byte[1024]; + ByteArrayOutputStream decompressOutBuf = new ByteArrayOutputStream(); + try { + ZstdInputStream decompress = new ZstdInputStream(new ByteArrayInputStream(src)); + while (true) { + int n = decompress.read(inBuf); + if (n <= 0) { + break; + } + decompressOutBuf.write(inBuf, 0, n); + } + ret = decompressOutBuf.toByteArray(); + } catch (IOException e) { + throw new PException("decompression failed: " + e.getMessage()); + } + return ret; + } +} diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java index b7fc2b2d..70347de5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java @@ -27,18 +27,27 @@ public void before() throws Exception { public void after() throws Exception { } + private static boolean isOpenAuth() { + // openAuth is on by default + String prop = System.getProperties().getProperty("test.open.auth"); + return (prop != null) ? Boolean.valueOf(prop) : false; + } + /** * Method: getReplicaSession(rpc_address address) */ @Test public void testGetReplicaSession() throws Exception { String[] address_list = { - "127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3" + "127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3" }; - ClusterManager testManager = new ClusterManager(1000, 1, false, - null, 60, address_list); - + ClusterManager testManager; + if (isOpenAuth()) { + testManager = new ClusterManager.Builder(1000, 1, address_list).openAuth("xxxx", "xxxx").build(); + } else { + testManager = new ClusterManager.Builder(1000, 1, address_list).build(); + } // input an invalid rpc address rpc_address address = new rpc_address(); ReplicaSession session = testManager.getReplicaSession(address); @@ -52,8 +61,12 @@ public void testGetReplicaSession() throws Exception { public void testOpenTable() throws Exception { // test invalid meta list String[] addr_list = {"127.0.0.1:123", "127.0.0.1:124", "127.0.0.1:125"}; - ClusterManager testManager = new ClusterManager(1000, 1, false, - null, 60, addr_list); + ClusterManager testManager; + if (isOpenAuth()) { + testManager = new ClusterManager.Builder(1000, 1, addr_list).openAuth("xxxx", "xxxx").build(); + } else { + testManager = new ClusterManager.Builder(1000, 1, addr_list).build(); + } TableHandler result = null; try { @@ -67,8 +80,11 @@ public void testOpenTable() throws Exception { // test partially invalid meta list String[] addr_list2 = {"127.0.0.1:123", "127.0.0.1:34603", "127.0.0.1:34601", "127.0.0.1:34602"}; - testManager = new ClusterManager(1000, 1, false, - null, 60, addr_list2); + if (isOpenAuth()) { + testManager = new ClusterManager.Builder(1000, 1, addr_list2).openAuth("xxxx", "xxxx").build(); + } else { + testManager = new ClusterManager.Builder(1000, 1, addr_list2).build(); + } try { result = testManager.openTable("hehe", KeyHasher.DEFAULT); } catch (ReplicationException e) { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java index 96c9c95b..315c2b75 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/MetaSessionTest.java @@ -7,7 +7,7 @@ import com.xiaomi.infra.pegasus.tools.Toollet; import org.junit.Assert; import org.junit.Test; -import org.junit.Before; +import org.junit.Before; import org.junit.After; import java.util.ArrayList; @@ -19,18 +19,18 @@ import com.xiaomi.infra.pegasus.base.*; import com.xiaomi.infra.pegasus.replication.query_cfg_request; -/** -* MetaSession Tester. -* -* @author sunweijie@xiaomi.com -* @version 1.0 -*/ -public class MetaSessionTest { +/** + * MetaSession Tester. + * + * @author sunweijie@xiaomi.com + * @version 1.0 + */ +public class MetaSessionTest { @Before - public void before() throws Exception { - } - + public void before() throws Exception { + } + @After public void after() throws Exception { rpc_address addr = new rpc_address(); @@ -53,9 +53,15 @@ private static void ensureNotLeader(rpc_address addr) { } } + private static boolean isOpenAuth() { + // openAuth is on by default + String prop = System.getProperties().getProperty("test.open.auth"); + return (prop != null) ? Boolean.valueOf(prop) : false; + } + /** - * Method: connect() - */ + * Method: connect() + */ @Test public void testConnect() throws Exception { // test: first connect to a wrong server @@ -63,8 +69,12 @@ public void testConnect() throws Exception { // then the wrong server crashed String[] addr_list = {"127.0.0.1:34602", "127.0.0.1:34603", "127.0.0.1:34601"}; - ClusterManager manager = new ClusterManager(1000, 4, false, - null, 60, addr_list); + ClusterManager manager; + if (isOpenAuth()) { + manager = new ClusterManager.Builder(1000, 4, addr_list).openAuth("xxxx", "xxxx").build(); + } else { + manager = new ClusterManager.Builder(1000, 4, addr_list).build(); + } MetaSession session = manager.getMetaSession(); rpc_address addr = new rpc_address(); @@ -72,7 +82,7 @@ public void testConnect() throws Exception { ensureNotLeader(addr); ArrayList> callbacks = new ArrayList>(); - for (int i=0; i<1000; ++i) { + for (int i = 0; i < 1000; ++i) { query_cfg_request req = new query_cfg_request("temp", new ArrayList()); final client_operator op = new query_cfg_operator(new gpid(-1, -1), req); FutureTask callback = new FutureTask(new Callable() { @@ -87,7 +97,7 @@ public Void call() throws Exception { } Toollet.closeServer(addr); - for (FutureTask cb: callbacks) { + for (FutureTask cb : callbacks) { try { Tools.waitUninterruptable(cb, Integer.MAX_VALUE); } catch (ExecutionException e) { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index 3ece8708..3ea9de95 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -13,7 +13,7 @@ import org.junit.Assert; import org.junit.Test; -import org.junit.Before; +import org.junit.Before; import org.junit.After; import com.xiaomi.infra.pegasus.operator.*; @@ -24,12 +24,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; -/** -* ReplicaSession Tester. -* -* @author sunweijie@xiaomi.com -* @version 1.0 -*/ +/** + * ReplicaSession Tester. + * + * @author sunweijie@xiaomi.com + * @version 1.0 + */ public class ReplicaSessionTest { private String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; private final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSessionTest.class); @@ -37,18 +37,23 @@ public class ReplicaSessionTest { @Before public void before() throws Exception { - manager = new ClusterManager(1000, 1, false, - null, 60, metaList); + String prop = System.getProperties().getProperty("test.open.auth"); + boolean openAuth = (prop != null) ? Boolean.valueOf(prop) : false; + if (openAuth) { + manager = new ClusterManager.Builder(1000, 1, metaList).openAuth("xxxx", "xxxx").build(); + } else { + manager = new ClusterManager.Builder(1000, 1, metaList).build(); + } } - + @After public void after() throws Exception { manager.close(); - } + } /** - * Method: connect() - */ + * Method: connect() + */ @Test public void testConnect() throws Exception { //test1: connect to a invalid address @@ -56,9 +61,9 @@ public void testConnect() throws Exception { addr.fromString("127.0.0.1:12345"); ReplicaSession rs = manager.getReplicaSession(addr); - ArrayList > callbacks = new ArrayList>(); + ArrayList> callbacks = new ArrayList>(); - for (int i=0; i<100; ++i) { + for (int i = 0; i < 100; ++i) { final client_operator op = new rrdb_put_operator(new com.xiaomi.infra.pegasus.base.gpid(-1, -1), "", null); @@ -74,7 +79,7 @@ public Void call() throws Exception { rs.asyncSend(op, cb, 1000); } - for (FutureTask cb: callbacks) { + for (FutureTask cb : callbacks) { try { Tools.waitUninterruptable(cb, Integer.MAX_VALUE); } catch (ExecutionException e) { @@ -86,7 +91,7 @@ public Void call() throws Exception { Toollet.waitCondition(new Toollet.BoolCallable() { @Override public boolean call() { - return ReplicaSession.ConnState.DISCONNECTED==cp_rs.getState(); + return ReplicaSession.ConnState.DISCONNECTED == cp_rs.getState(); } }, 5); @@ -101,7 +106,7 @@ public boolean abandonIt(error_code.error_types err, TMessage header) { return true; } }); - for (int i=0; i<20; ++i) { + for (int i = 0; i < 20; ++i) { // we send query request to replica server. We expect it to discard it. final int index = i; update_request req = new update_request( @@ -127,7 +132,7 @@ public Void call() throws Exception { rs.asyncSend(op, cb, 500); } - for (int i=0; i<80; ++i) { + for (int i = 0; i < 80; ++i) { // then we still send query request to replica server. But the timeout is longer. update_request req = new update_request( new blob("hello".getBytes()), @@ -147,7 +152,7 @@ public Void call() throws Exception { rs.asyncSend(op, cb, 2000); } - for (FutureTask cb: callbacks) { + for (FutureTask cb : callbacks) { try { Tools.waitUninterruptable(cb, Integer.MAX_VALUE); } catch (ExecutionException e) { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java index c66272c2..10ac095e 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java @@ -1,7 +1,7 @@ // Copyright (c) 2017, Xiaomi, Inc. All rights reserved. // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. -package com.xiaomi.infra.pegasus.rpc.async; +package com.xiaomi.infra.pegasus.rpc.async; import com.xiaomi.infra.pegasus.rpc.KeyHasher; import com.xiaomi.infra.pegasus.rpc.ReplicationException; @@ -23,11 +23,11 @@ import java.util.ArrayList; /** -* TableHandler Tester. -* -* @author sunweijie@xiaomi.com -* @version 1.0 -*/ + * TableHandler Tester. + * + * @author sunweijie@xiaomi.com + * @version 1.0 + */ public class TableHandlerTest { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TableHandlerTest.class); @@ -38,8 +38,13 @@ public class TableHandlerTest { @Before public void before() throws Exception { - testManager = new ClusterManager(1000, 1, false, - null, 60, addr_list); + String prop = System.getProperties().getProperty("test.open.auth"); + boolean openAuth = (prop != null) ? Boolean.valueOf(prop) : false; + if (openAuth) { + testManager = new ClusterManager.Builder(1000, 1, addr_list).openAuth("xxxx", "xxxx").build(); + } else { + testManager = new ClusterManager.Builder(1000, 1, addr_list).build(); + } } @After @@ -48,7 +53,7 @@ public void after() throws Exception { private rpc_address getValidWrongServer(final rpc_address right_address) { ArrayList replicas = new ArrayList(); - for (int i=0; i