Skip to content

Commit

Permalink
Merge branch 'master' into inner-2234&2336
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyh1 authored Sep 6, 2023
2 parents 7b8f916 + 341fdcd commit c25da26
Show file tree
Hide file tree
Showing 16 changed files with 526 additions and 93 deletions.
17 changes: 16 additions & 1 deletion src/main/assembly/conf/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,20 @@
<DefaultRolloverStrategy max="10"/>
</RollingFile>
-->
<RollingFile name="ThreadChecker" fileName="logs/thread.log"
filePattern="logs/$${date:yyyy-MM}/thread-%d{MM-dd}-%i.log.gz">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %5p [%t] (%l) - %m%n</Pattern>
</PatternLayout>
<Policies>
<OnStartupTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="250 MB"/>
<TimeBasedTriggeringPolicy/>
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
</Appenders>

<Loggers>
<!-- independent log file for new ha interface, for use useOuterHa only
<Logger name="ha_log" additivity="false" includeLocation="false" >
Expand All @@ -87,7 +100,9 @@
<!-- <AppenderRef ref="DumpFileLog" />-->
<!-- <AppenderRef ref="RollingFile"/>-->
<!-- </Logger>-->

<Logger name="ThreadChecker" additivity="false" includeLocation="false">
<AppenderRef ref="ThreadChecker"/>
</Logger>
<asyncRoot level="info" includeLocation="true">
<!--<AppenderRef ref="Console" />-->
<AppenderRef ref="RollingFile"/>
Expand Down
48 changes: 27 additions & 21 deletions src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.actiontech.dble.singleton.*;
import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import com.actiontech.dble.singleton.ThreadChecker;
import com.actiontech.dble.util.ExecutorUtil;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -293,6 +294,7 @@ public void startup() throws Exception {
LOGGER.info(server.getName() + " is started and listening on " + server.getPort());
LOGGER.info("=====================================Server started success=======================================");

ThreadCheckerScheduler.getInstance().init();
Scheduler.getInstance().init(timerExecutor);
LOGGER.info("=======================================Scheduler started==========================================");

Expand Down Expand Up @@ -337,7 +339,7 @@ private void initExecutor(int frontProcessorCount, int backendProcessorCount) {
backendExecutor = ExecutorUtil.createFixed(BACKEND_WORKER_NAME, SystemConfig.getInstance().getBackendWorker(), runnableMap);
writeToBackendExecutor = ExecutorUtil.createFixed(WRITE_TO_BACKEND_WORKER_NAME, SystemConfig.getInstance().getWriteToBackendWorker(), runnableMap);
complexQueryExecutor = ExecutorUtil.createCached(COMPLEX_QUERY_EXECUTOR_NAME, SystemConfig.getInstance().getComplexQueryWorker(), null);
timerExecutor = ExecutorUtil.createFixed(TIMER_WORKER_NAME, 1);
timerExecutor = ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance());
nioFrontExecutor = ExecutorUtil.createFixed(NIO_FRONT_RW, frontProcessorCount, runnableMap);
nioBackendExecutor = ExecutorUtil.createFixed(NIO_BACKEND_RW, backendProcessorCount, runnableMap);
}
Expand Down Expand Up @@ -459,30 +461,34 @@ public Runnable processorCheck() {
return new Runnable() {
@Override
public void run() {
timerExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (IOProcessor p : backendProcessors) {
p.checkBackendCons();
try {
timerExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (IOProcessor p : backendProcessors) {
p.checkBackendCons();
}
} catch (Exception e) {
LOGGER.info("checkBackendCons caught err:", e);
}
} catch (Exception e) {
LOGGER.info("checkBackendCons caught err:", e);
}
}
});
timerExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (IOProcessor p : frontProcessors) {
p.checkFrontCons();
});
timerExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (IOProcessor p : frontProcessors) {
p.checkFrontCons();
}
} catch (Exception e) {
LOGGER.info("checkFrontCons caught err:", e);
}
} catch (Exception e) {
LOGGER.info("checkFrontCons caught err:", e);
}
}
});
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "processorCheck()");
}
}
};
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/actiontech/dble/alarm/AlarmCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ private AlarmCode() {
public static final String DB_MASTER_INSTANCE_DELAY_FAIL = "DB_MASTER_INSTANCE_DELAY_FAIL";
public static final String SLOW_QUERY_QUEUE_POLICY_ABORT = "SLOW_QUERY_QUEUE_POLICY_ABORT";
public static final String SLOW_QUERY_QUEUE_POLICY_WAIT = "SLOW_QUERY_QUEUE_POLICY_WAIT";
public static final String THREAD_SUSPECTED_HANG = "DBLE_THREAD_SUSPECTED_HANG"; //Resolve by trigger
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ private ToResolveContainer() {
public static final Set<String> XA_WRITE_CHECK_POINT_FAIL = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> DB_INSTANCE_LOWER_CASE_ERROR = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> DB_SLAVE_INSTANCE_DELAY = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> THREAD_SUSPECTED_HANG = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class RWSplitNonBlockingSession extends Session {
public static final Logger LOGGER = LoggerFactory.getLogger(RWSplitNonBlockingSession.class);

private volatile BackendConnection conn;

//previous preserve conn
private volatile BackendConnection backupConn;
private final RWSplitService rwSplitService;
private PhysicalDbGroup rwGroup;

Expand Down Expand Up @@ -314,9 +317,20 @@ public void executeHint(DbleHintParser.HintInfo hintInfo, int sqlType, String sq
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("route sql {} to {}", sql, dbInstance);
}
if (rwSplitService.isInTransaction()) {
if (conn != null) {
backupConn = conn;
conn = null;
}
rwSplitService.setForceUseAutoCommit(true);
}
RWSplitHandler handler = new RWSplitHandler(rwSplitService, false, null, callback);
dbInstance.getConnection(rwSplitService.getSchema(), handler, null, false);
} catch (Exception e) {
if (rwSplitService.isForceUseAutoCommit()) {
conn = backupConn;
backupConn = null;
}
rwSplitService.executeException(e, sql);
}
}
Expand Down Expand Up @@ -350,8 +364,11 @@ public void bind(BackendConnection bindConn) {

public void unbindIfSafe() {
final BackendConnection tmp = conn;
if (tmp != null && rwSplitService.isKeepBackendConn()) {
if (tmp != null && !rwSplitService.isKeepBackendConn()) {
this.conn = null;
conn = backupConn;
backupConn = null;

if (rwSplitService.isFlowControlled()) {
releaseConnectionFromFlowControlled(tmp);
}
Expand All @@ -363,16 +380,28 @@ public void unbindIfSafe() {

public void unbind() {
this.conn = null;
conn = backupConn;
backupConn = null;
}

public void close(String reason) {
if (null != rwGroup) {
rwGroup.unBindRwSplitSession(this);
}
final BackendConnection tmp = this.conn;
this.conn = null;
if (tmp != null) {
tmp.close(reason);
{
final BackendConnection tmp = this.conn;
this.conn = null;
if (tmp != null) {
tmp.close(reason);
}
}

{
final BackendConnection tmp = this.backupConn;
this.backupConn = null;
if (tmp != null) {
tmp.close(reason);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) {

HELPS.put("show @@statistic", "Turn off statistic information");
HELPS.put("enable @@statistic", "Turn on statistic sql");
HELPS.put("enable @@enableStatisticAnalysis", "Turn on statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("enable @@statisticAnalysis", "Turn on statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("disable @@statistic", "Turn off statistic sql");
HELPS.put("disable @@enableStatisticAnalysis", "Turn off statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("disable @@statisticAnalysis", "Turn off statistic analysis sql('show @@sql.sum.user/table' or 'show @@sql.condition')");
HELPS.put("reload @@statistic_table_size = ? [where table='?' | where table in (dble_information.tableA,...)]", "Statistic table size");
HELPS.put("reload @@samplingRate=?", "Reset the samplingRate size");
HELPS.put("show @@statistic_queue.usage", "Show the queue usage");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ public void execute(byte[] originPacket) {

write(originPacket, WriteFlags.QUERY_END);
}

public void execute(BusinessService service, String sql) {
execute(service, sql, false);
}

public void execute(BusinessService service, String sql, boolean forceUseAutoCommit) {
boolean changeUser = isChangeUser(service);
if (changeUser) return;

StringBuilder synSQL = getSynSql(null, null, service.isAutocommit(), service);
StringBuilder synSQL = getSynSql(null, null, forceUseAutoCommit || service.isAutocommit(), service);
if (protocolResponseHandler != defaultResponseHandler) {
protocolResponseHandler = defaultResponseHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void execute(final BackendConnection conn) {
mysqlService.execute(rwSplitService, originPacket);
} else if (!StringUtil.isEmpty(executeSql)) {
// such as: Hint sql (remove comment sentences)
mysqlService.execute(rwSplitService, executeSql);
mysqlService.execute(rwSplitService, executeSql, rwSplitService.isForceUseAutoCommit());
} else {
// not happen
mysqlService.execute(rwSplitService, rwSplitService.getExecuteSqlBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {

// prepare statement
private ConcurrentHashMap<Long, PreparedStatementHolder> psHolder = new ConcurrentHashMap<>();
private boolean forceUseAutoCommit = false;


public RWSplitService(AbstractConnection connection, AuthResultInfo info) {
super(connection, info);
Expand Down Expand Up @@ -116,6 +118,13 @@ protected void beforeInsertServiceTask(@NotNull ServiceTask task) {
session.trace(t -> t.setRequestTime());
}

@Override
protected boolean beforeHandlingTask(@NotNull ServiceTask task) {
//initialize value for the REQUEST
setForceUseAutoCommit(false);
return super.beforeHandlingTask(task);
}

@Override
protected void handleInnerData(byte[] data) {
session.trace(t -> t.startProcess());
Expand Down Expand Up @@ -400,7 +409,8 @@ public PreparedStatementHolder getPrepareStatement(long id) {
}

public boolean isKeepBackendConn() {
return !isInTransaction() && !isInLoadData() && psHolder.isEmpty() && !isLockTable() && !isUsingTmpTable() && nameSet.isEmpty();
boolean releaseConn = (!isInTransaction() || isForceUseAutoCommit()) && !isInLoadData() && psHolder.isEmpty() && !isLockTable() && !isUsingTmpTable() && nameSet.isEmpty();
return !releaseConn;
}

public boolean isInitDb() {
Expand All @@ -423,6 +433,14 @@ public void setMultiHandler(RWSplitMultiHandler multiHandler) {
this.multiHandler = multiHandler;
}

public boolean isForceUseAutoCommit() {
return forceUseAutoCommit;
}

public void setForceUseAutoCommit(boolean forceUseAutoCommit) {
this.forceUseAutoCommit = forceUseAutoCommit;
}

@Override
public void killAndClose(String reason) {
session.close(reason);
Expand Down
Loading

0 comments on commit c25da26

Please sign in to comment.