Skip to content

Commit

Permalink
Merge branch 'master' into inner-2112_3
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyh1 authored Sep 7, 2023
2 parents 94b705f + 322ea21 commit 938280e
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class RWSplitNonBlockingSession extends Session {
public static final Logger LOGGER = LoggerFactory.getLogger(RWSplitNonBlockingSession.class);

private volatile BackendConnection conn;

//previous preserve conn
private volatile BackendConnection backupConn;
private final RWSplitService rwSplitService;
private PhysicalDbGroup rwGroup;

Expand Down Expand Up @@ -314,9 +317,20 @@ public void executeHint(DbleHintParser.HintInfo hintInfo, int sqlType, String sq
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("route sql {} to {}", sql, dbInstance);
}
if (rwSplitService.isInTransaction()) {
if (conn != null) {
backupConn = conn;
conn = null;
}
rwSplitService.setForceUseAutoCommit(true);
}
RWSplitHandler handler = new RWSplitHandler(rwSplitService, false, null, callback);
dbInstance.getConnection(rwSplitService.getSchema(), handler, null, false);
} catch (Exception e) {
if (rwSplitService.isForceUseAutoCommit()) {
conn = backupConn;
backupConn = null;
}
rwSplitService.executeException(e, sql);
}
}
Expand Down Expand Up @@ -350,8 +364,11 @@ public void bind(BackendConnection bindConn) {

public void unbindIfSafe() {
final BackendConnection tmp = conn;
if (tmp != null && rwSplitService.isKeepBackendConn()) {
if (tmp != null && !rwSplitService.isKeepBackendConn()) {
this.conn = null;
conn = backupConn;
backupConn = null;

if (rwSplitService.isFlowControlled()) {
releaseConnectionFromFlowControlled(tmp);
}
Expand All @@ -363,16 +380,28 @@ public void unbindIfSafe() {

public void unbind() {
this.conn = null;
conn = backupConn;
backupConn = null;
}

public void close(String reason) {
if (null != rwGroup) {
rwGroup.unBindRwSplitSession(this);
}
final BackendConnection tmp = this.conn;
this.conn = null;
if (tmp != null) {
tmp.close(reason);
{
final BackendConnection tmp = this.conn;
this.conn = null;
if (tmp != null) {
tmp.close(reason);
}
}

{
final BackendConnection tmp = this.backupConn;
this.backupConn = null;
if (tmp != null) {
tmp.close(reason);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,22 @@ private static SystemVariables checkVersionAGetSystemVariables(ConfigInitializer
//check packetSize/lowerCase
ConfigUtil.getAndSyncKeyVariables(changeItemList, true);
//keep the original system variables
newSystemVariables = DbleServer.getInstance().getSystemVariables();
SystemVariables memorySystemVariables = DbleServer.getInstance().getSystemVariables();
SystemVariables currentSystemVariables = getSystemVariablesFromdbGroup(loader, loader.getDbGroups());
int memoryLowerCase = memorySystemVariables.isLowerCaseTableNames() ? 1 : 0;
if (memorySystemVariables.isLowerCaseTableNames() != currentSystemVariables.isLowerCaseTableNames()) { // check if the lowerCase of the backend has changed
throw new Exception("Dble memory's lowercase value is " + memoryLowerCase + ", " +
"But it was found that the lower_case_table_names value of the dbInstance is not " + memoryLowerCase + ". " +
"Please unify dbInstances's lower_case_table_names or use 'reload @@config_all -r;'");
} else {
newSystemVariables = memorySystemVariables;
}
}
ReloadLogHelper.briefInfo("check and get system variables from random node end");
return newSystemVariables;
}



/**
* test connection
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) {

HELPS.put("show @@statistic", "Turn off statistic information");
HELPS.put("enable @@statistic", "Turn on statistic sql");
HELPS.put("enable @@enableStatisticAnalysis", "Turn on statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("enable @@statisticAnalysis", "Turn on statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("disable @@statistic", "Turn off statistic sql");
HELPS.put("disable @@enableStatisticAnalysis", "Turn off statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("disable @@statisticAnalysis", "Turn off statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("reload @@statistic_table_size = ? [where table='?' | where table in (dble_information.tableA,...)]", "Statistic table size");
HELPS.put("reload @@samplingRate=?", "Reset the samplingRate size");
HELPS.put("show @@statistic_queue.usage", "Show the queue usage");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ public void execute(byte[] originPacket) {

write(originPacket, WriteFlags.QUERY_END);
}

public void execute(BusinessService service, String sql) {
execute(service, sql, false);
}

public void execute(BusinessService service, String sql, boolean forceUseAutoCommit) {
boolean changeUser = isChangeUser(service);
if (changeUser) return;

StringBuilder synSQL = getSynSql(null, null, service.isAutocommit(), service);
StringBuilder synSQL = getSynSql(null, null, forceUseAutoCommit || service.isAutocommit(), service);
if (protocolResponseHandler != defaultResponseHandler) {
protocolResponseHandler = defaultResponseHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void execute(final BackendConnection conn) {
mysqlService.execute(rwSplitService, originPacket);
} else if (!StringUtil.isEmpty(executeSql)) {
// such as: Hint sql (remove comment sentences)
mysqlService.execute(rwSplitService, executeSql);
mysqlService.execute(rwSplitService, executeSql, rwSplitService.isForceUseAutoCommit());
} else {
// not happen
mysqlService.execute(rwSplitService, rwSplitService.getExecuteSqlBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {

// prepare statement
private ConcurrentHashMap<Long, PreparedStatementHolder> psHolder = new ConcurrentHashMap<>();
private boolean forceUseAutoCommit = false;


public RWSplitService(AbstractConnection connection, AuthResultInfo info) {
super(connection, info);
Expand Down Expand Up @@ -116,6 +118,13 @@ protected void beforeInsertServiceTask(@NotNull ServiceTask task) {
session.trace(t -> t.setRequestTime());
}

@Override
protected boolean beforeHandlingTask(@NotNull ServiceTask task) {
//initialize value for the REQUEST
setForceUseAutoCommit(false);
return super.beforeHandlingTask(task);
}

@Override
protected void handleInnerData(byte[] data) {
session.trace(t -> t.startProcess());
Expand Down Expand Up @@ -400,7 +409,8 @@ public PreparedStatementHolder getPrepareStatement(long id) {
}

public boolean isKeepBackendConn() {
return !isInTransaction() && !isInLoadData() && psHolder.isEmpty() && !isLockTable() && !isUsingTmpTable() && nameSet.isEmpty();
boolean releaseConn = (!isInTransaction() || isForceUseAutoCommit()) && !isInLoadData() && psHolder.isEmpty() && !isLockTable() && !isUsingTmpTable() && nameSet.isEmpty();
return !releaseConn;
}

public boolean isInitDb() {
Expand All @@ -423,6 +433,14 @@ public void setMultiHandler(RWSplitMultiHandler multiHandler) {
this.multiHandler = multiHandler;
}

public boolean isForceUseAutoCommit() {
return forceUseAutoCommit;
}

public void setForceUseAutoCommit(boolean forceUseAutoCommit) {
this.forceUseAutoCommit = forceUseAutoCommit;
}

@Override
public void killAndClose(String reason) {
session.close(reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public long getExaminedRows() {
}

public String getSqlDigest() {
if (init.compareAndSet(false, true)) {
if (init.compareAndSet(false, true) || this.sqlDigest == null) {
try {
if (stmt.equalsIgnoreCase("begin")) {
this.sqlDigest = "begin";
Expand Down

0 comments on commit 938280e

Please sign in to comment.