Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix about handling auth error
Browse files Browse the repository at this point in the history
  • Loading branch information
huangwei5 committed Jan 16, 2019
1 parent f9069f8 commit ebf360f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 50 deletions.
2 changes: 1 addition & 1 deletion configuration/pegasus.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ enable_perf_counter = false
perf_counter_tags = cluster=onebox,app=unit_test
push_counter_interval_secs = 10
open_auth = false
jaas_conf = configuration/jaas.conf
jaas_conf = configuration/pegasus_jaas.conf
service_name = xxx
service_fqdn = xxx
File renamed without changes.
2 changes: 1 addition & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class Cluster {
public static final String PEGASUS_OPEN_AUTH_DEF = "false";

public static final String PEGASUS_JAAS_CONF_KEY = "jaas_conf";
public static final String PEGASUS_JAAS_CONF_DEF = "configuration/jaas.conf";
public static final String PEGASUS_JAAS_CONF_DEF = "configuration/pegasus_jaas.conf";

public static Cluster createCluster(Properties config) throws IllegalArgumentException {
int operatorTimeout = Integer.parseInt(config.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public ClusterManager(Builder builder) {

String jaasConf = System.getProperties().getProperty("java.security.auth.login.config");
if (jaasConf == null) {
System.setProperty("java.security.auth.login.config", "configuration/jaas.conf");
System.setProperty("java.security.auth.login.config", "configuration/pegasus_jaas.conf");
jaasConf = System.getProperties().getProperty("java.security.auth.login.config");
}
logger.info("open authentication, jaas config path: {}, login now", jaasConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public MetaSession(ClusterManager manager, String addrList[],
if (rpc_addr.fromString(addr)) {
logger.info("add {} as meta server", addr);
metaList.add(clusterManager.getReplicaSession(rpc_addr));
}
else {
} else {
logger.error("invalid address {}", addr);
}
}
Expand Down Expand Up @@ -128,16 +127,15 @@ private final void onFinishQueryMeta(final MetaRequestRound round) {
needDelay = false;
needSwitchLeader = true;
forwardAddress = getMetaServiceForwardAddress(op);
}
else {
} else {
round.callbackFunc.run();
return;
}
} else if (op.rpc_error.errno == error_types.ERR_SESSION_RESET || op.rpc_error.errno == error_types.ERR_TIMEOUT) {
needDelay = true;
needSwitchLeader = true;
} else {
logger.error("unknown error: {}", op.rpc_error.errno.toString());
logger.error(op.rpc_error.errno == error_types.ERR_UNAUTHENTICATED ? "{}" : "unknown error: {}", op.rpc_error.errno.toString());
round.callbackFunc.run();
return;
}
Expand Down Expand Up @@ -168,8 +166,7 @@ private final void onFinishQueryMeta(final MetaRequestRound round) {
metaList.add(clusterManager.getReplicaSession(forwardAddress));
curLeader = metaList.size() - 1;
}
}
else if (metaList.get(curLeader) == round.lastSession) {
} else if (metaList.get(curLeader) == round.lastSession) {
curLeader = (curLeader + 1) % metaList.size();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.xiaomi.infra.pegasus.apps.versionConstants.CLIENT_VERSION;

/**
* Created by weijiesun on 17-9-13.
*/
Expand Down Expand Up @@ -218,12 +220,14 @@ private void markSessionNegotiation(Channel activeChannel) {
newCache.state = ConnState.NEGOTIATION;
newCache.nettyChannel = activeChannel;
fields = newCache;
logger.info("{}: mark session state negotiation(phase 1: version; phase 2: security), now negotiate in phase 1", name());
//startNegotiation();
//startVersionNego();
logger.info("{}: mark session state negotiation(phase 1: version; phase 2: auth), now negotiate in phase 1", name());
// for no version nego server
// if (needAuthConnection()) startNegotiation();
// else markSessionConnected(activeChannel);
startVersionNego();
}

private void markSessionDisconnect() {
private void markSessionDisconnect(error_types errorType) {
VolatileFields cache = fields;
synchronized (pendingSend) {
if (cache.state != ConnState.DISCONNECTED) {
Expand All @@ -235,14 +239,14 @@ private void markSessionDisconnect() {
// this. In this case, we are relying on the timeout task.
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
tryNotifyWithSequenceID(e.sequenceId, errorType, false);
}
List<RequestEntry> l = new LinkedList<RequestEntry>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
l.add(entry.getValue());
}
for (RequestEntry e : l) {
tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
tryNotifyWithSequenceID(e.sequenceId, errorType, false);
}

cache = new VolatileFields();
Expand All @@ -255,6 +259,16 @@ private void markSessionDisconnect() {
}
}

// for netty event.. TODO HW 标记状态
private void markSessionDisconnect() {
markSessionDisconnect(error_types.ERR_SESSION_RESET);
}

// for handling logic ... TODO HW 主动断开
private void disconnectCurrentSession(error_types errorType) {
markSessionDisconnect(errorType);
}

private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) {
logger.debug("{}: {} is notified with error {}, isTimeoutTask {}",
name(), seqID, errno.toString(), isTimeoutTask);
Expand Down Expand Up @@ -310,9 +324,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {

logger.info("Session {} needs negotiation(version + auth)", name());
markSessionNegotiation(ctx.channel());
//}// else {
//markSessionConnected(ctx.channel());
// }
}

@Override
Expand All @@ -339,7 +350,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
private void startVersionNego() {
version_nego_message msg = new version_nego_message();
msg.status = version_nego_status.INVALID;
msg.version = new blob("1.12.SNAPSHOT".getBytes());
msg.version = CLIENT_VERSION;
final RequestEntry entry = new ReplicaSession.RequestEntry();
entry.sequenceId = seqId.getAndIncrement();
entry.op = new version_nego_operator(msg);
Expand All @@ -365,15 +376,18 @@ public void run() {
if (resp == null) {
throw new Exception("VersionNegoHandler received a null response, abandon it");
}

logger.info("version response {}", resp.status);
if (resp.status == version_nego_status.MATCH) {
logger.info("{}: version negotiation succeed", name());
if (needAuthConnection() && !isAuthed())
if (needAuthConnection())// && !isAuthed())
startNegotiation();
else
markSessionConnected(fields.nettyChannel);
} else {
throw new Exception("Version negotiation failed, " + resp.status);
logger.info("{}: version nego failed, disconnect current session with ERR_VERSION_NEGO_FAILED");
disconnectCurrentSession(error_types.ERR_VERSION_NEGO_FAILED);
// markSessionDisconnect();// TODO HW
// throw new Exception("Version negotiation failed, " + resp.status);
}
} catch (Exception e) {
// e.printStackTrace();
Expand All @@ -387,21 +401,21 @@ private boolean needAuthConnection() {
return openAuth;
}

private boolean isAuthed() {
return negoStatus == negotiation_status.SASL_SUCC;
}

public void resetAuthStatus() {
negoStatus = negotiation_status.INVALID;
}
// private boolean isAuthed() {
// return negoStatus == negotiation_status.SASL_SUCC;
// }
//
// public void resetAuthStatus() {
// negoStatus = negotiation_status.INVALID;
// }

private void startNegotiation() {
negotiation_message msg = new negotiation_message();
msg.status = negotiation_status.SASL_LIST_MECHANISMS;

sendNegoMsg(msg);

logger.info("{}: start negotiation in phase 2", name());
logger.info("{}: start negotiation in phase 2 auth", name());
}

private void sendNegoMsg(negotiation_message msg) {
Expand Down Expand Up @@ -450,7 +464,10 @@ private void handleResp() throws Exception {
final negotiation_message msg = new negotiation_message();
switch (resp.status) {
case INVALID:
throw new Exception("Received a response which status is INVALID");
case SASL_AUTH_FAIL:
disconnectCurrentSession(error_types.ERR_AUTH_NEGO_FAILED);// TODO HW
//throw new Exception("Received a response which status is " + resp.status + ", break off this negotiation");
return;
case SASL_LIST_MECHANISMS_RESP:
Subject.doAs(
subject,
Expand Down Expand Up @@ -498,11 +515,7 @@ public Object run() throws Exception {
case SASL_SUCC:
logger.info("{}: negotiation succeed, mark session connected and negoStatus succeed", name());
markSessionConnected(fields.nettyChannel);
negoStatus = negotiation_status.SASL_SUCC; // After succeed, the authentication is permanent in this session
return;
case SASL_AUTH_FAIL:
//throw new Exception("Received SASL_AUTH_FAIL");
startNegotiation();
//negoStatus = negotiation_status.SASL_SUCC; // After succeed, the authentication is permanent in this session
return;
default:
throw new Exception("Received an unknown response, status " + resp.status);
Expand Down Expand Up @@ -545,7 +558,7 @@ private final static class VolatileFields {
private String serviceFqdn; // name used for SASL authentication
private CallbackHandler cbh = null; // Don't need handler for GSSAPI
private SaslClient saslClient;
private negotiation_status negoStatus = negotiation_status.INVALID;
// private negotiation_status negoStatus = negotiation_status.INVALID; needless, fields.state can replace it
private final HashMap<String, Object> props = new HashMap<String, Object>();
private LoginContext loginContext = null;
private final Subject subject;
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,6 @@ void onRpcReply(ClientRequestRound round,
operator.rpc_error.errno.toString());
break;

case ERR_AUTH_FAILED:
logger.warn("{}: replica server({}) is auth-failed for gpid({}), operator({}), try({}), error_code({}), reset authStatus and retry later",
tableName_,
cachedHandle.session.name(),
operator.get_gpid().toString(),
operator,
tryId,
operator.rpc_error.errno.toString());
cachedHandle.session.closeSession();
cachedHandle.session.resetAuthStatus();
break;

// under other cases we should not retry
default:
logger.error("{}: replica server({}) fails for gpid({}), operator({}), try({}), error_code({}), not retry",
Expand Down

0 comments on commit ebf360f

Please sign in to comment.