diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java index d3f3462b79..4e7402fdf6 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java @@ -182,4 +182,9 @@ public String toString() { public boolean isQuit() { return connectionRef.getStamp() == 2; } + + + public MySQLHeartbeat getHeartbeat() { + return heartbeat; + } } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index 953c410df7..9fb29625c0 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -45,6 +45,7 @@ public class MySQLHeartbeat { private final DbInstanceSyncRecorder asyncRecorder = new DbInstanceSyncRecorder(); private final PhysicalDbInstance source; protected volatile MySQLHeartbeatStatus status; + private volatile long beginTimeoutTime = 0; private String heartbeatSQL; private long heartbeatTimeout; // during the time, heart failed will ignore private final AtomicInteger errorCount = new AtomicInteger(0); @@ -251,10 +252,16 @@ private void setTimeout() { } if (status != MySQLHeartbeatStatus.TIMEOUT) { LOGGER.warn("heartbeat to [{}] setTimeout, previous status is {}", source.getConfig().getUrl(), status); + beginTimeoutTime = System.currentTimeMillis(); status = MySQLHeartbeatStatus.TIMEOUT; } } + + public long getBeginTimeoutTime() { + return beginTimeoutTime; + } + public boolean isHeartBeatOK() { if (status == MySQLHeartbeatStatus.OK || status == MySQLHeartbeatStatus.INIT) { return true; diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java index e9b62db915..dfee37cb05 100644 --- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java @@ -84,6 +84,7 @@ private SystemConfig() { private long idleTimeout = DEFAULT_IDLE_TIMEOUT; // sql execute timeout (second) private long sqlExecuteTimeout = 300; + private long heartbeatSqlExecuteTimeout = 10; // connection will force close if received close packet but haven't been closed after closeTimeout milliseconds. // set the value too big is not a good idea. private long closeTimeout = 100; @@ -782,6 +783,14 @@ public void setSqlExecuteTimeout(long sqlExecuteTimeout) { } + public long getHeartbeatSqlExecuteTimeout() { + return heartbeatSqlExecuteTimeout; + } + + public void setHeartbeatSqlExecuteTimeout(long heartbeatSqlExecuteTimeout) { + this.heartbeatSqlExecuteTimeout = heartbeatSqlExecuteTimeout; + } + public int getTxIsolation() { return txIsolation; } @@ -1698,6 +1707,7 @@ public String toString() { ", groupConcatMaxLen='" + groupConcatMaxLen + ", releaseTimeout=" + releaseTimeout + ", enableAsyncRelease=" + enableAsyncRelease + + ", heartbeatSqlExecuteTimeout=" + heartbeatSqlExecuteTimeout + "]"; } diff --git a/src/main/java/com/actiontech/dble/net/IOProcessor.java b/src/main/java/com/actiontech/dble/net/IOProcessor.java index 925a4d6088..fc70ed67e0 100644 --- a/src/main/java/com/actiontech/dble/net/IOProcessor.java +++ b/src/main/java/com/actiontech/dble/net/IOProcessor.java @@ -7,6 +7,9 @@ import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob; +import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat; +import com.actiontech.dble.backend.heartbeat.MySQLHeartbeatStatus; import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage; import com.actiontech.dble.backend.mysql.xa.TxState; import com.actiontech.dble.buffer.BufferPool; @@ -187,6 +190,7 @@ private void checkConSendQueue(AbstractConnection c) { private void backendCheck() { long sqlTimeout = SystemConfig.getInstance().getSqlExecuteTimeout() * 1000L; + final long heartbeatSqlExecuteTimeout = SystemConfig.getInstance().getHeartbeatSqlExecuteTimeout() * 1000L; Iterator> it = backends.entrySet().iterator(); while (it.hasNext()) { BackendConnection c = it.next().getValue(); @@ -226,6 +230,14 @@ private void backendCheck() { if (!c.getBackendService().isDDL() && c.getState() == PooledConnection.STATE_IN_USE && c.getBackendService().isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) { LOGGER.info("found backend connection SQL timeout ,close it " + c); c.close("sql timeout"); + } else if ((c.getBackendService().getResponseHandler() instanceof HeartbeatSQLJob)) { + if (heartbeatSqlExecuteTimeout > 0) { + final MySQLHeartbeat heartbeat = ((HeartbeatSQLJob) c.getBackendService().getResponseHandler()).getHeartbeat(); + if (c.getBackendService().isExecuting() && heartbeat.getStatus() == MySQLHeartbeatStatus.TIMEOUT && heartbeat.getBeginTimeoutTime() < System.currentTimeMillis() - heartbeatSqlExecuteTimeout) { + LOGGER.info("found backend heartbeat connection SQL timeout ,close it " + c); + c.close("heart sql timeout"); + } + } } // clean closed conn or check time out diff --git a/src/main/java/com/actiontech/dble/singleton/SystemParams.java b/src/main/java/com/actiontech/dble/singleton/SystemParams.java index 75d7aeb79a..3a86feb027 100644 --- a/src/main/java/com/actiontech/dble/singleton/SystemParams.java +++ b/src/main/java/com/actiontech/dble/singleton/SystemParams.java @@ -91,6 +91,7 @@ private SystemParams() { readOnlyParams.add(new ParamInfo("checkTableConsistencyPeriod", sysConfig.getCheckTableConsistencyPeriod() + "ms", "The period of consistency tableStructure check. The default value is 1800000ms(means 30minutes=30*60*1000)")); readOnlyParams.add(new ParamInfo("processorCheckPeriod", sysConfig.getProcessorCheckPeriod() + "ms", "The period between the jobs for cleaning the closed or overtime connections. The default is 1000ms")); readOnlyParams.add(new ParamInfo("sqlExecuteTimeout", sysConfig.getSqlExecuteTimeout() + "s", "The max query executing time.If time out,the connection will be closed. The default is 300 seconds")); + readOnlyParams.add(new ParamInfo("heartbeatSqlExecuteTimeout", sysConfig.getHeartbeatSqlExecuteTimeout() + "s", "The max heartbeat query executing time.If time out,the connection will be closed. The default is 10 seconds.set 0 to disable it.")); readOnlyParams.add(new ParamInfo("recordTxn", sysConfig.getRecordTxn() + "", "Whether the transaction be recorded as a file, the default value is 0")); readOnlyParams.add(new ParamInfo("transactionLogBaseDir", sysConfig.getTransactionLogBaseDir(), "The directory of the transaction record file, the default value is ./txlogs/")); readOnlyParams.add(new ParamInfo("transactionLogBaseName", sysConfig.getTransactionLogBaseName(), "The name of the transaction record file. The default value is server-tx"));