diff --git a/configuration/pegasus.properties b/configuration/pegasus.properties index fea4cee4..167ad465 100644 --- a/configuration/pegasus.properties +++ b/configuration/pegasus.properties @@ -5,6 +5,6 @@ enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 open_auth = false -jaas_conf = configuration/jaas.conf +jaas_conf = configuration/pegasus_jaas.conf service_name = xxx service_fqdn = xxx diff --git a/configuration/jaas.conf b/configuration/pegasus_jaas.conf similarity index 100% rename from configuration/jaas.conf rename to configuration/pegasus_jaas.conf diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index 7428918e..a0c14e33 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -39,7 +39,7 @@ public abstract class Cluster { public static final String PEGASUS_OPEN_AUTH_DEF = "false"; public static final String PEGASUS_JAAS_CONF_KEY = "jaas_conf"; - public static final String PEGASUS_JAAS_CONF_DEF = "configuration/jaas.conf"; + public static final String PEGASUS_JAAS_CONF_DEF = "configuration/pegasus_jaas.conf"; public static Cluster createCluster(Properties config) throws IllegalArgumentException { int operatorTimeout = Integer.parseInt(config.getProperty( diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index bdae9382..96a3ecaf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -106,7 +106,7 @@ public ClusterManager(Builder builder) { String jaasConf = System.getProperties().getProperty("java.security.auth.login.config"); if (jaasConf == null) { - System.setProperty("java.security.auth.login.config", "configuration/jaas.conf"); + System.setProperty("java.security.auth.login.config", "configuration/pegasus_jaas.conf"); jaasConf = System.getProperties().getProperty("java.security.auth.login.config"); } logger.info("open authentication, jaas config path: {}, login now", jaasConf); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java index cb97f1c0..9a0914b3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java @@ -29,8 +29,7 @@ public MetaSession(ClusterManager manager, String addrList[], if (rpc_addr.fromString(addr)) { logger.info("add {} as meta server", addr); metaList.add(clusterManager.getReplicaSession(rpc_addr)); - } - else { + } else { logger.error("invalid address {}", addr); } } @@ -128,8 +127,7 @@ private final void onFinishQueryMeta(final MetaRequestRound round) { needDelay = false; needSwitchLeader = true; forwardAddress = getMetaServiceForwardAddress(op); - } - else { + } else { round.callbackFunc.run(); return; } @@ -137,7 +135,7 @@ private final void onFinishQueryMeta(final MetaRequestRound round) { needDelay = true; needSwitchLeader = true; } else { - logger.error("unknown error: {}", op.rpc_error.errno.toString()); + logger.error(op.rpc_error.errno == error_types.ERR_UNAUTHENTICATED ? "{}" : "unknown error: {}", op.rpc_error.errno.toString()); round.callbackFunc.run(); return; } @@ -168,8 +166,7 @@ private final void onFinishQueryMeta(final MetaRequestRound round) { metaList.add(clusterManager.getReplicaSession(forwardAddress)); curLeader = metaList.size() - 1; } - } - else if (metaList.get(curLeader) == round.lastSession) { + } else if (metaList.get(curLeader) == round.lastSession) { curLeader = (curLeader + 1) % metaList.size(); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 52f4c559..7cf39b0a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.xiaomi.infra.pegasus.apps.versionConstants.CLIENT_VERSION; + /** * Created by weijiesun on 17-9-13. */ @@ -218,12 +220,14 @@ private void markSessionNegotiation(Channel activeChannel) { newCache.state = ConnState.NEGOTIATION; newCache.nettyChannel = activeChannel; fields = newCache; - logger.info("{}: mark session state negotiation(phase 1: version; phase 2: security), now negotiate in phase 1", name()); - //startNegotiation(); - //startVersionNego(); + logger.info("{}: mark session state negotiation(phase 1: version; phase 2: auth), now negotiate in phase 1", name()); + // for no version nego server +// if (needAuthConnection()) startNegotiation(); +// else markSessionConnected(activeChannel); + startVersionNego(); } - private void markSessionDisconnect() { + private void markSessionDisconnect(error_types errorType) { VolatileFields cache = fields; synchronized (pendingSend) { if (cache.state != ConnState.DISCONNECTED) { @@ -235,14 +239,14 @@ private void markSessionDisconnect() { // this. In this case, we are relying on the timeout task. while (!pendingSend.isEmpty()) { RequestEntry e = pendingSend.poll(); - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyWithSequenceID(e.sequenceId, errorType, false); } List l = new LinkedList(); for (Map.Entry entry : pendingResponse.entrySet()) { l.add(entry.getValue()); } for (RequestEntry e : l) { - tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false); + tryNotifyWithSequenceID(e.sequenceId, errorType, false); } cache = new VolatileFields(); @@ -255,6 +259,16 @@ private void markSessionDisconnect() { } } + // for netty event.. TODO HW 标记状态 + private void markSessionDisconnect() { + markSessionDisconnect(error_types.ERR_SESSION_RESET); + } + + // for handling logic ... TODO HW 主动断开 + private void disconnectCurrentSession(error_types errorType) { + markSessionDisconnect(errorType); + } + private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug("{}: {} is notified with error {}, isTimeoutTask {}", name(), seqID, errno.toString(), isTimeoutTask); @@ -310,9 +324,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Session {} needs negotiation(version + auth)", name()); markSessionNegotiation(ctx.channel()); - //}// else { - //markSessionConnected(ctx.channel()); - // } } @Override @@ -339,7 +350,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E private void startVersionNego() { version_nego_message msg = new version_nego_message(); msg.status = version_nego_status.INVALID; - msg.version = new blob("1.12.SNAPSHOT".getBytes()); + msg.version = CLIENT_VERSION; final RequestEntry entry = new ReplicaSession.RequestEntry(); entry.sequenceId = seqId.getAndIncrement(); entry.op = new version_nego_operator(msg); @@ -365,15 +376,18 @@ public void run() { if (resp == null) { throw new Exception("VersionNegoHandler received a null response, abandon it"); } - + logger.info("version response {}", resp.status); if (resp.status == version_nego_status.MATCH) { logger.info("{}: version negotiation succeed", name()); - if (needAuthConnection() && !isAuthed()) + if (needAuthConnection())// && !isAuthed()) startNegotiation(); else markSessionConnected(fields.nettyChannel); } else { - throw new Exception("Version negotiation failed, " + resp.status); + logger.info("{}: version nego failed, disconnect current session with ERR_VERSION_NEGO_FAILED"); + disconnectCurrentSession(error_types.ERR_VERSION_NEGO_FAILED); + // markSessionDisconnect();// TODO HW + // throw new Exception("Version negotiation failed, " + resp.status); } } catch (Exception e) { // e.printStackTrace(); @@ -387,13 +401,13 @@ private boolean needAuthConnection() { return openAuth; } - private boolean isAuthed() { - return negoStatus == negotiation_status.SASL_SUCC; - } - - public void resetAuthStatus() { - negoStatus = negotiation_status.INVALID; - } +// private boolean isAuthed() { +// return negoStatus == negotiation_status.SASL_SUCC; +// } +// +// public void resetAuthStatus() { +// negoStatus = negotiation_status.INVALID; +// } private void startNegotiation() { negotiation_message msg = new negotiation_message(); @@ -401,7 +415,7 @@ private void startNegotiation() { sendNegoMsg(msg); - logger.info("{}: start negotiation in phase 2", name()); + logger.info("{}: start negotiation in phase 2 auth", name()); } private void sendNegoMsg(negotiation_message msg) { @@ -450,7 +464,10 @@ private void handleResp() throws Exception { final negotiation_message msg = new negotiation_message(); switch (resp.status) { case INVALID: - throw new Exception("Received a response which status is INVALID"); + case SASL_AUTH_FAIL: + disconnectCurrentSession(error_types.ERR_AUTH_NEGO_FAILED);// TODO HW + //throw new Exception("Received a response which status is " + resp.status + ", break off this negotiation"); + return; case SASL_LIST_MECHANISMS_RESP: Subject.doAs( subject, @@ -498,11 +515,7 @@ public Object run() throws Exception { case SASL_SUCC: logger.info("{}: negotiation succeed, mark session connected and negoStatus succeed", name()); markSessionConnected(fields.nettyChannel); - negoStatus = negotiation_status.SASL_SUCC; // After succeed, the authentication is permanent in this session - return; - case SASL_AUTH_FAIL: - //throw new Exception("Received SASL_AUTH_FAIL"); - startNegotiation(); + //negoStatus = negotiation_status.SASL_SUCC; // After succeed, the authentication is permanent in this session return; default: throw new Exception("Received an unknown response, status " + resp.status); @@ -545,7 +558,7 @@ private final static class VolatileFields { private String serviceFqdn; // name used for SASL authentication private CallbackHandler cbh = null; // Don't need handler for GSSAPI private SaslClient saslClient; - private negotiation_status negoStatus = negotiation_status.INVALID; + // private negotiation_status negoStatus = negotiation_status.INVALID; needless, fields.state can replace it private final HashMap props = new HashMap(); private LoginContext loginContext = null; private final Subject subject; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 2e648ec1..3b0f7d75 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -273,18 +273,6 @@ void onRpcReply(ClientRequestRound round, operator.rpc_error.errno.toString()); break; - case ERR_AUTH_FAILED: - logger.warn("{}: replica server({}) is auth-failed for gpid({}), operator({}), try({}), error_code({}), reset authStatus and retry later", - tableName_, - cachedHandle.session.name(), - operator.get_gpid().toString(), - operator, - tryId, - operator.rpc_error.errno.toString()); - cachedHandle.session.closeSession(); - cachedHandle.session.resetAuthStatus(); - break; - // under other cases we should not retry default: logger.error("{}: replica server({}) fails for gpid({}), operator({}), try({}), error_code({}), not retry",