Skip to content

Commit

Permalink
PR feedback: split the list of set commands into two separate collect…
Browse files Browse the repository at this point in the history
…ions

Signed-off-by: Hector Geraldino <[email protected]>
  • Loading branch information
hgeraldino committed Apr 3, 2023
1 parent 7035b75 commit 7b44aea
Showing 1 changed file with 69 additions and 23 deletions.
92 changes: 69 additions & 23 deletions cdb2jdbc/src/main/java/com/bloomberg/comdb2/jdbc/Comdb2Handle.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public class Comdb2Handle extends AbstractConnection {

HashMap<String, Cdb2BindValue> bindVars;
HashMap<Integer, Cdb2BindValue> bindVarsByIndex;
private Set<String> connectionSetStmts; // Collection of "set" SQL statements for this Connection
private Set<String> statementSetStmts; // Collection of "set" SQL statements for the current Statement
private List<String> sentSetStmts; // Collection of "set" SQL statements sent for this Connection
private List<String> pendingSetStmts; // Collection of "set" SQL statements pending to be sent

private boolean ack = false;
private boolean skipDrain = false;
Expand Down Expand Up @@ -201,8 +201,8 @@ public Comdb2Handle duplicate() {
without discovering twice. */
public Comdb2Handle() {
super(new ProtobufProtocol(), null);
connectionSetStmts = new LinkedHashSet<String>();
statementSetStmts = new LinkedHashSet<String>();
sentSetStmts = new LinkedList<String>();
pendingSetStmts = new LinkedList<String>();

/* CDB2JDBC_STATEMENT_QUERYEFFECTS and CDB2JDBC_VERIFY_RETRY
are used by the Jepsen tests to change the driver's behaviors. */
Expand All @@ -226,12 +226,12 @@ public Comdb2Handle() {

String userEnv = System.getenv("COMDB2_USER");
if (userEnv != null) {
connectionSetStmts.add("set user " + userEnv);
addSetStatement("set user " + userEnv);
}

String passwordEnv = System.getenv("COMDB2_PASSWORD");
if (passwordEnv != null) {
connectionSetStmts.add("set password " + passwordEnv);
addSetStatement("set password " + passwordEnv);
}

uuid = UUID.randomUUID().toString();
Expand Down Expand Up @@ -331,11 +331,11 @@ public void setVerifyRetry(boolean val) {
return;

if (val) {
connectionSetStmts.remove("set verifyretry off");
connectionSetStmts.add("set verifyretry on");
removeSetStatement("set verifyretry off");
addSetStatement("set verifyretry on");
} else {
connectionSetStmts.remove("set verifyretry on");
connectionSetStmts.add("set verifyretry off");
removeSetStatement("set verifyretry on");
addSetStatement("set verifyretry off");
}

verifyretry = val;
Expand Down Expand Up @@ -384,11 +384,11 @@ public void setStatementQueryEffects(boolean val) {
return;

if (val) {
connectionSetStmts.remove("set queryeffects transaction");
connectionSetStmts.add("set queryeffects statement");
removeSetStatement("set queryeffects transaction");
addSetStatement("set queryeffects statement");
} else {
connectionSetStmts.remove("set queryeffects statement");
connectionSetStmts.add("set queryeffects transaction");
removeSetStatement("set queryeffects statement");
addSetStatement("set queryeffects transaction");
}

stmteffects = val;
Expand Down Expand Up @@ -685,8 +685,7 @@ private boolean sendQuery(String sql, List<Integer> types,
sqlQuery.bindVars.addAll(bindVarsByIndex.values());
if (debug)
tdlog(Level.FINEST, "starting sendQuery");
sqlQuery.setFlags.addAll(connectionSetStmts);
sqlQuery.setFlags.addAll(statementSetStmts);
sqlQuery.setFlags.addAll(pendingSetStmts);

if (types != null)
sqlQuery.types.addAll(types);
Expand Down Expand Up @@ -981,9 +980,9 @@ private int runStatementInt(String sql, List<Integer> types) {
if (isClientOnlySetCommand(lowerSql)) {
tdlog(Level.FINEST, "Added client-only set command %s", sql);
} else {
statementSetStmts.add(sql);
addSetStatement(sql);
tdlog(Level.FINEST, "Added '%s' to sets size is %d uuid is %s",
sql, statementSetStmts.size(), uuid);
sql, pendingSetStmts.size(), uuid);

// HASql sessions need the file & offset from begin
String hasql[] = lowerSql.split("hasql");
Expand Down Expand Up @@ -1558,11 +1557,6 @@ public synchronized int runStatement(String sql, List<Integer> types) {
temp_trans = false;
}

if(!sql.toLowerCase().startsWith("set")) {
// Make sure to clear the list of "set" sql statements for the current Comdb2Statement
statementSetStmts.clear();
}

return rc;
}

Expand Down Expand Up @@ -1638,6 +1632,7 @@ public synchronized int next() {
if (lastResp.respType == 2) {
rc = lastResp.errCode;
} else if (lastResp.respType == 3) {
mergeSetStatements();
rc = Errors.CDB2_OK_DONE;
} else {
rc = Errors.CDB2ERR_IO_ERROR;
Expand Down Expand Up @@ -1807,6 +1802,8 @@ private int next_int() {
return rc;
}

mergeSetStatements();

if (inTxn && lastResp.features != null) {
for (int feature : lastResp.features) {
if (CDB2ServerFeatures.SKIP_INTRANS_RESULTS_VALUE == feature) {
Expand Down Expand Up @@ -1946,6 +1943,7 @@ private boolean reopen(boolean refresh_dbinfo_if_failed) {
return false;
dbHostConnected = prefIdx;
dbHostIdx = prefIdx;
resetSetStatements();
opened = true;
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -1983,6 +1981,7 @@ private boolean reopen(boolean refresh_dbinfo_if_failed) {
if (!trySSL())
return false;
dbHostConnected = try_node;
resetSetStatements();
opened = true;
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -2025,6 +2024,7 @@ private boolean reopen(boolean refresh_dbinfo_if_failed) {
if (!trySSL())
return false;
dbHostConnected = dbHostIdx;
resetSetStatements();
opened = true;
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -2055,6 +2055,7 @@ private boolean reopen(boolean refresh_dbinfo_if_failed) {
if (!trySSL())
return false;
dbHostConnected = dbHostIdx;
resetSetStatements();
opened = true;
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -2084,6 +2085,7 @@ private boolean reopen(boolean refresh_dbinfo_if_failed) {
return false;
dbHostConnected = masterIndexInMyDbHosts;
dbHostIdx = masterIndexInMyDbHosts;
resetSetStatements();
opened = true;
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -2111,6 +2113,50 @@ private boolean reopen(boolean refresh_dbinfo_if_failed) {
return false;
}

/**
* Adds a 'set' statement to the pending list.
*/
private void addSetStatement(String statement) {
removeSetStatement(statement);
pendingSetStmts.add(statement);
}

/**
* Removes a 'set' statement from the pending list.
*/
private void removeSetStatement(String statement) {
pendingSetStmts.remove(statement);
}

/**
* Caches the already sent 'set' statements, so they can be re-sent in the case of a
* disconnection.
*/
private void mergeSetStatements() {
for (String setStatement : pendingSetStmts) {
sentSetStmts.remove(setStatement);
sentSetStmts.add(setStatement);
}

pendingSetStmts.clear();
}

/**
* Restores the cached 'set' statements, in order to be sent again after a reconnection.
*/
private void resetSetStatements() {
// Make a copy of the pending statements before clearing the list
List<String> pendingStatementsCopy = new LinkedList<String>(pendingSetStmts);

pendingSetStmts.clear();
pendingSetStmts.addAll(sentSetStmts); // Add the sent statements

// Add the statements that were pending before the disconnection happened
for (String setStatement : pendingStatementsCopy) {
addSetStatement(setStatement);
}
}

private void closeNoException() {
if (debug)
tdlog(Level.FINEST, "starting closeNoException");
Expand Down

0 comments on commit 7b44aea

Please sign in to comment.