diff --git a/src/main/java/com/actiontech/dble/DbleServer.java b/src/main/java/com/actiontech/dble/DbleServer.java index 4b1b0514a8..cf615b2c19 100644 --- a/src/main/java/com/actiontech/dble/DbleServer.java +++ b/src/main/java/com/actiontech/dble/DbleServer.java @@ -45,8 +45,8 @@ import com.actiontech.dble.singleton.*; import com.actiontech.dble.statistic.sql.StatisticManager; import com.actiontech.dble.statistic.stat.ThreadWorkUsage; -import com.actiontech.dble.singleton.ThreadChecker; import com.actiontech.dble.util.ExecutorUtil; +import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor; import com.actiontech.dble.util.TimeUtil; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -77,6 +77,7 @@ public final class DbleServer { public static final String WRITE_TO_BACKEND_WORKER_NAME = "writeToBackendWorker"; public static final String COMPLEX_QUERY_EXECUTOR_NAME = "complexQueryWorker"; public static final String TIMER_WORKER_NAME = "Timer"; + public static final String TIMER_SCHEDULER_WORKER_NAME = "TimerScheduler"; public static final String NIO_FRONT_RW = "NIOFrontRW"; public static final String NIO_BACKEND_RW = "NIOBackendRW"; public static final String AIO_EXECUTOR_NAME = "AIO"; @@ -113,7 +114,8 @@ public static DbleServer getInstance() { private ExecutorService backendExecutor; private ExecutorService writeToBackendExecutor; private ExecutorService complexQueryExecutor; - private ExecutorService timerExecutor; + private volatile ExecutorService timerExecutor; + private volatile NameableScheduledThreadPoolExecutor timerSchedulerExecutor; private Map threadUsedMap = new ConcurrentHashMap<>(); private Deque frontHandlerQueue; @@ -295,7 +297,7 @@ public void startup() throws Exception { LOGGER.info("=====================================Server started success======================================="); ThreadCheckerScheduler.getInstance().init(); - Scheduler.getInstance().init(timerExecutor); + Scheduler.getInstance().init(); LOGGER.info("=======================================Scheduler started=========================================="); XaCheckHandler.initXaIdCheckPeriod(); @@ -340,6 +342,7 @@ private void initExecutor(int frontProcessorCount, int backendProcessorCount) { writeToBackendExecutor = ExecutorUtil.createFixed(WRITE_TO_BACKEND_WORKER_NAME, SystemConfig.getInstance().getWriteToBackendWorker(), runnableMap); complexQueryExecutor = ExecutorUtil.createCached(COMPLEX_QUERY_EXECUTOR_NAME, SystemConfig.getInstance().getComplexQueryWorker(), null); timerExecutor = ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance()); + timerSchedulerExecutor = ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance()); nioFrontExecutor = ExecutorUtil.createFixed(NIO_FRONT_RW, frontProcessorCount, runnableMap); nioBackendExecutor = ExecutorUtil.createFixed(NIO_BACKEND_RW, backendProcessorCount, runnableMap); } @@ -488,6 +491,10 @@ public void run() { }); } catch (RejectedExecutionException e) { ThreadChecker.getInstance().timerExecuteError(e, "processorCheck()"); + } catch (Throwable e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("scheduled task processorCheck() happen exception: ", e); + } } } }; @@ -621,6 +628,17 @@ public ExecutorService getTimerExecutor() { return timerExecutor; } + public NameableScheduledThreadPoolExecutor getTimerSchedulerExecutor() { + return timerSchedulerExecutor; + } + + public void setTimerExecutor(ExecutorService timerExecutor) { + this.timerExecutor = timerExecutor; + } + + public void setTimerSchedulerExecutor(NameableScheduledThreadPoolExecutor timerSchedulerExecutor) { + this.timerSchedulerExecutor = timerSchedulerExecutor; + } public ExecutorService getComplexQueryExecutor() { return complexQueryExecutor; diff --git a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java index 12de9576c6..ef96ff7bf7 100644 --- a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java +++ b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java @@ -5,6 +5,7 @@ package com.actiontech.dble.backend.delyDetection; +import com.actiontech.dble.DbleServer; import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; @@ -13,7 +14,6 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.config.model.db.DbGroupConfig; import com.actiontech.dble.net.connection.BackendConnection; -import com.actiontech.dble.singleton.Scheduler; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -105,7 +105,7 @@ public void start(long initialDelay) { //avoid concurrency with the master initialDelay = initialDelay >> 1; } - this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> execute(), + this.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleAtFixedRate(() -> execute(), initialDelay, this.delayPeriodMillis, TimeUnit.MILLISECONDS); } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index d9cb02fed3..390582a9b9 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -11,7 +11,6 @@ import com.actiontech.dble.alarm.AlertUtil; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.meta.ReloadLogHelper; -import com.actiontech.dble.singleton.Scheduler; import com.actiontech.dble.statistic.DbInstanceSyncRecorder; import com.actiontech.dble.statistic.HeartbeatRecorder; import com.actiontech.dble.util.TimeUtil; @@ -94,7 +93,7 @@ public void start(long heartbeatPeriodMillis) { if (LOGGER.isDebugEnabled()) { ReloadLogHelper.debug("start heartbeat :{}", this.toString()); } - this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> { + this.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleAtFixedRate(() -> { if (DbleServer.getInstance().getConfig().isFullyConfigured()) { if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) { return; diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java index 6c07793e57..0504324eeb 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java @@ -233,6 +233,9 @@ private void ownThreadJob() { if (!itemToDiscard.isNullItem()) { BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex()); while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } if (discardQueue.take().isNullItem() || isFail()) { break; } @@ -245,15 +248,7 @@ private void ownThreadJob() { nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex()); } } - Iterator>> iterator = this.queues.entrySet().iterator(); - MySQLResponseService service = null; - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - service = entry.getKey(); - entry.getValue().clear(); - session.releaseConnectionIfSafe(entry.getKey(), false); - iterator.remove(); - } + MySQLResponseService service = clearQueueAndReleaseConnection(); session.trace(t -> t.doSqlStat(selectRows, netOutBytes.intValue(), resultSize.intValue())); assert service != null; if (!isFail()) { @@ -269,4 +264,17 @@ private void ownThreadJob() { session.onQueryError(msg.getBytes()); } } + + private MySQLResponseService clearQueueAndReleaseConnection() { + Iterator>> iterator = this.queues.entrySet().iterator(); + MySQLResponseService service = null; + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + service = entry.getKey(); + entry.getValue().clear(); + session.releaseConnectionIfSafe(entry.getKey(), false); + iterator.remove(); + } + return service; + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java index 9ecc6ec32b..9fafb107de 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeAndOrderHandler.java @@ -195,6 +195,9 @@ protected void ownThreadJob(Object... objects) { if (!itemToDiscard.isNullItem()) { BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex()); while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } if (discardQueue.take().isNullItem() || terminate.get()) { break; } @@ -238,6 +241,10 @@ protected void recycleResources() { Entry> entry = iterator.next(); // fair lock queue,poll for clear while (true) { + if (Thread.currentThread().isInterrupted()) { + LOGGER.info("manual interrupted"); + break; + } if (entry.getValue().poll() == null) { break; } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java index 869af8d6fa..bc16d2b70e 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java @@ -99,6 +99,9 @@ protected void ownThreadJob(Object... objects) { recordElapsedTime("order writeDirectly start :"); try { while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } if (terminate.get()) { return; } @@ -110,13 +113,16 @@ protected void ownThreadJob(Object... objects) { } localResult.add(row); } catch (InterruptedException e) { - //ignore error + throw e; } } recordElapsedTime("order writeDirectly end :"); localResult.done(); recordElapsedTime("order read start :"); while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } if (terminate.get()) { return; } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java index c763c7fdab..df6747cff4 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java @@ -155,6 +155,9 @@ protected void ownThreadJob(Object... objects) { try { int eofCount = 0; for (; ; ) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } RowDataPacket row = outQueue.take(); if (row.getFieldCount() == 0) { eofCount++; diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java index 757ae0e0b3..0547c29f75 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java @@ -187,6 +187,9 @@ protected void ownThreadJob(Object... objects) { leftLocal = takeFirst(leftQueue); rightLocal = takeFirst(rightQueue); while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } if (terminate.get()) return; RowDataPacket leftRow = leftLocal.getLastRow(); @@ -228,15 +231,7 @@ protected void ownThreadJob(Object... objects) { if (!nestLoopDependOn) { HandlerTool.terminateHandlerTree(this); } - // for trace, when join end before all rows return ,the handler should mark as finished - for (DMLResponseHandler mergeHandler : this.getMerges()) { - DMLResponseHandler handler = mergeHandler; - while (handler != null && handler != this) { - session.setHandlerEnd(handler); - handler = handler.getNextHandler(); - } - } - session.setHandlerEnd(this); + makeFinishedOfHandler(); nextHandler.rowEofResponse(null, isLeft, service); } catch (MySQLOutPutException e) { String msg = e.getLocalizedMessage(); @@ -254,6 +249,18 @@ protected void ownThreadJob(Object... objects) { } } + // for trace, when join end before all rows return ,the handler should mark as finished + private void makeFinishedOfHandler() { + for (DMLResponseHandler mergeHandler : this.getMerges()) { + DMLResponseHandler handler = mergeHandler; + while (handler != null && handler != this) { + session.setHandlerEnd(handler); + handler = handler.getNextHandler(); + } + } + session.setHandlerEnd(this); + } + private LocalResult takeFirst(FairLinkedBlockingDeque deque) throws InterruptedException { /** * it must be in single thread diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java index 61e5d036b0..ae6ad6dc84 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java @@ -138,6 +138,9 @@ protected void ownThreadJob(Object... objects) { leftLocal = takeFirst(leftQueue); rightLocal = takeFirst(rightQueue); while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("manual interrupted"); + } RowDataPacket leftRow = leftLocal.getLastRow(); RowDataPacket rightRow = rightLocal.getLastRow(); if (leftRow.getFieldCount() == 0) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/xa/XaCheckHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/xa/XaCheckHandler.java index ca48c74ece..70a912d7ed 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/xa/XaCheckHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/xa/XaCheckHandler.java @@ -15,7 +15,6 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.table.BaseTableConfig; -import com.actiontech.dble.singleton.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,10 +167,10 @@ public static void adjustXaIdCheckPeriod(long period0) { } } - private static void startXaIdCheckPeriod() { + public static void startXaIdCheckPeriod() { synchronized (INSTANCE) { if (INSTANCE.xaIdCheckPeriod > 0) { - INSTANCE.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleWithFixedDelay(() -> { + INSTANCE.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleWithFixedDelay(() -> { (new XAAnalysisHandler()).checkResidualTask(); }, 0, INSTANCE.xaIdCheckPeriod, TimeUnit.SECONDS); LOGGER.info("====================================Start XaIdCheckPeriod[{}]=========================================", INSTANCE.xaIdCheckPeriod); @@ -179,7 +178,7 @@ private static void startXaIdCheckPeriod() { } } - private static void stopXaIdCheckPeriod() { + public static void stopXaIdCheckPeriod() { synchronized (INSTANCE) { ScheduledFuture future = INSTANCE.scheduledFuture; if (future != null) { diff --git a/src/main/java/com/actiontech/dble/cluster/logic/AbstractClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/AbstractClusterLogic.java index becff13968..ff0b60caff 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/AbstractClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/AbstractClusterLogic.java @@ -103,6 +103,10 @@ public String waitingForAllTheNode(String path) { Map expectedMap = ClusterHelper.getOnlineMap(); StringBuilder errorMsg = new StringBuilder(); for (; ; ) { + if (Thread.currentThread().isInterrupted()) { + errorMsg.append("manual interrupted"); + break; + } errorMsg.setLength(0); if (checkResponseForOneTime(path, expectedMap, errorMsg)) { break; diff --git a/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java b/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java index 5c1c209174..ae3a3dc9ee 100644 --- a/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java +++ b/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java @@ -217,6 +217,9 @@ public TableMeta getSyncTableMeta(String schema, String tbName) throws SQLNonTra TraceManager.log(ImmutableMap.of("schema", schema, "table", tbName), traceObject); try { while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new SQLNonTransientException("manual interrupted"); + } int oldVersion = version.get(); if (metaCount.get() == 0) { TableMeta meta = getTableMeta(schema, tbName); @@ -248,6 +251,9 @@ public PlanNode getSyncView(String schema, String vName) throws SQLNonTransientE TraceManager.log(ImmutableMap.of("schema", schema, "table", vName), traceObject); try { while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new SQLNonTransientException("manual interrupted"); + } int oldVersion = version.get(); if (catalogs.get(schema) == null) { return null; diff --git a/src/main/java/com/actiontech/dble/route/parser/ManagerParse.java b/src/main/java/com/actiontech/dble/route/parser/ManagerParse.java index 6dd49d23d5..01ef7dd9b0 100644 --- a/src/main/java/com/actiontech/dble/route/parser/ManagerParse.java +++ b/src/main/java/com/actiontech/dble/route/parser/ManagerParse.java @@ -56,6 +56,7 @@ private ManagerParse() { public static final int CLUSTER = 38; public static final int SPLIT_LOAD_DATA = 39; public static final int KILL_CLUSTER_RENEW_THREAD = 40; + public static final int THREAD = 41; public static int parse(String stmt) { for (int i = 0; i < stmt.length(); i++) { @@ -109,10 +110,26 @@ public static int parse(String stmt) { return OTHER; } - // truncate table + // t private static int tCheck(String stmt, int offset) { - if (stmt.length() > offset + 8) { - char c1 = stmt.charAt(++offset); + if (stmt.length() > offset++) { + switch (stmt.charAt(offset)) { + case 'H': + case 'h': + return thCheck(stmt, offset); + case 'R': + case 'r': + return trCheck(stmt, offset); + default: + return OTHER; + } + } + return OTHER; + } + + // truncate table + private static int trCheck(String stmt, int offset) { + if (stmt.length() > offset + 7) { char c2 = stmt.charAt(++offset); char c3 = stmt.charAt(++offset); char c4 = stmt.charAt(++offset); @@ -120,8 +137,7 @@ private static int tCheck(String stmt, int offset) { char c6 = stmt.charAt(++offset); char c7 = stmt.charAt(++offset); char c8 = stmt.charAt(++offset); - if ((c1 == 'R' || c1 == 'r') && - (c2 == 'U' || c2 == 'u') && + if ((c2 == 'U' || c2 == 'u') && (c3 == 'N' || c3 == 'n') && (c4 == 'C' || c4 == 'c') && (c5 == 'A' || c5 == 'a') && @@ -134,6 +150,25 @@ private static int tCheck(String stmt, int offset) { return OTHER; } + // thread + private static int thCheck(String stmt, int offset) { + if (stmt.length() > offset + 5) { + char c1 = stmt.charAt(++offset); + char c2 = stmt.charAt(++offset); + char c3 = stmt.charAt(++offset); + char c4 = stmt.charAt(++offset); + char c5 = stmt.charAt(++offset); + if ((c1 == 'R' || c1 == 'r') && + (c2 == 'E' || c2 == 'e') && + (c3 == 'A' || c3 == 'a') && + (c4 == 'D' || c4 == 'd') && + ParseUtil.isSpace(c5)) { + return offset << 8 | THREAD; + } + } + return OTHER; + } + private static int dCheck(String stmt, int offset) { if (stmt.length() > ++offset) { switch (stmt.charAt(offset)) { diff --git a/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java b/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java index 21ccb6942d..790ccc0b95 100644 --- a/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java +++ b/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java @@ -86,6 +86,9 @@ private void loadInstanceIdByConfig() { private void loadInstanceIdByZK() { int execCount = 1; while (true) { + if (Thread.currentThread().isInterrupted()) { + throw new RuntimeException("instanceId allocate error when using zk, reason: manual interrupted"); + } if (execCount > this.retryCount) { throw new RuntimeException("instanceId allocate error when using zk, reason: no available instanceId found"); } diff --git a/src/main/java/com/actiontech/dble/services/FrontendService.java b/src/main/java/com/actiontech/dble/services/FrontendService.java index 8c5e575c37..e6c98b046b 100644 --- a/src/main/java/com/actiontech/dble/services/FrontendService.java +++ b/src/main/java/com/actiontech/dble/services/FrontendService.java @@ -321,6 +321,14 @@ public void writeOkPacket() { write(ok); } + public void writeOkPacket(String msg) { + OkPacket ok = OkPacket.getDefault(); + byte packet = (byte) this.packetId.incrementAndGet(); + ok.setPacketId(packet); + ok.setMessage(StringUtil.encode(msg, charsetName.getResults())); + write(ok); + } + public void writeErrMessage(String code, String msg, int vendorCode) { writeErrMessage((byte) this.nextPacketId(), vendorCode, code, msg); } diff --git a/src/main/java/com/actiontech/dble/services/manager/ManagerQueryHandler.java b/src/main/java/com/actiontech/dble/services/manager/ManagerQueryHandler.java index 9f71b85936..971d45a050 100644 --- a/src/main/java/com/actiontech/dble/services/manager/ManagerQueryHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/ManagerQueryHandler.java @@ -171,6 +171,9 @@ public void query(String sql) { case ManagerParse.CLUSTER: ClusterManageHandler.handle(sql, service, rs >>> SHIFT); break; + case ManagerParse.THREAD: + ThreadHandler.handle(sql, service, rs >>> SHIFT); + break; default: service.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement"); } diff --git a/src/main/java/com/actiontech/dble/services/manager/dump/DumpFileHandler.java b/src/main/java/com/actiontech/dble/services/manager/dump/DumpFileHandler.java index 84388b6580..062712a988 100644 --- a/src/main/java/com/actiontech/dble/services/manager/dump/DumpFileHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/dump/DumpFileHandler.java @@ -52,6 +52,10 @@ public DumpFileHandler(BlockingQueue queue, BlockingQueue insert public void run() { while (true) { + if (Thread.currentThread().isInterrupted()) { + LOGGER.info("dump file handler was manual interrupted."); + break; + } try { String stmts = handleQueue.take(); SplitFileProvider.getHandleQueueSizeOfTake(handleQueue.size()); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPool.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPool.java index 4237ed32ea..e5b6b2c2c0 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPool.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPool.java @@ -13,13 +13,17 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.meta.ColumnMeta; import com.actiontech.dble.net.connection.AbstractConnection; -import com.actiontech.dble.net.executor.*; +import com.actiontech.dble.net.executor.BackendCurrentRunnable; +import com.actiontech.dble.net.executor.FrontendBlockRunnable; +import com.actiontech.dble.net.executor.FrontendCurrentRunnable; +import com.actiontech.dble.net.executor.WriteToBackendRunnable; import com.actiontech.dble.net.impl.nio.RW; import com.actiontech.dble.net.service.ServiceTask; import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import com.actiontech.dble.services.manager.information.ManagerWritableTable; import com.actiontech.dble.util.IntegerUtil; import com.actiontech.dble.util.NameableExecutor; +import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor; import com.actiontech.dble.util.StringUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -75,6 +79,7 @@ protected List> getRows() { DbleServer server = DbleServer.getInstance(); List> lst = new ArrayList<>(5); lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getTimerExecutor()))); + lst.add(getRow(new ThreadPoolInfo((server.getTimerSchedulerExecutor())))); lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getFrontExecutor()))); lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getManagerFrontExecutor()))); lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getBackendExecutor()))); @@ -372,6 +377,14 @@ static class ThreadPoolInfo { this.queueSize = nameableExecutor.getQueue().size(); } + ThreadPoolInfo(NameableScheduledThreadPoolExecutor executor) { + this.name = executor.getName(); + this.poolSize = executor.getPoolSize(); + this.corePoolSize = executor.getCorePoolSize(); + this.activeCount = executor.getActiveCount(); + this.queueSize = executor.getQueue().size(); + } + ThreadPoolInfo(String name, int poolSize, int corePoolSize, int activeCount, int queueSize) { this.name = name; this.poolSize = poolSize; diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPoolTask.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPoolTask.java index 7c5a1ebf69..e606b954eb 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPoolTask.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleThreadPoolTask.java @@ -22,6 +22,7 @@ import com.actiontech.dble.services.FrontendService; import com.actiontech.dble.services.manager.information.ManagerBaseTable; import com.actiontech.dble.util.NameableExecutor; +import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ import java.util.List; import java.util.Map; +import static com.actiontech.dble.DbleServer.TIMER_SCHEDULER_WORKER_NAME; + public final class DbleThreadPoolTask extends ManagerBaseTable { private static final Logger LOGGER = LoggerFactory.getLogger(DbleThreadPoolTask.class); private static final String COLUMN_NAME = "name"; @@ -74,6 +77,7 @@ protected List> getRows() { DbleServer server = DbleServer.getInstance(); List> lst = new ArrayList<>(5); lst.add(getRow((NameableExecutor) server.getTimerExecutor())); + lst.add(getRow(server.getTimerSchedulerExecutor())); lst.add(getRow((NameableExecutor) server.getFrontExecutor())); lst.add(getRow((NameableExecutor) server.getManagerFrontExecutor())); lst.add(getRow((NameableExecutor) server.getBackendExecutor())); @@ -94,6 +98,20 @@ private LinkedHashMap getRow(NameableExecutor exec) { return row; } + private LinkedHashMap getRow(NameableScheduledThreadPoolExecutor exec) { + LinkedHashMap row = new LinkedHashMap<>(); + if (exec.getName().equals(TIMER_SCHEDULER_WORKER_NAME)) { + row.put(COLUMN_NAME, exec.getName()); + row.put(COLUMN_POOL_SIZE, exec.getPoolSize() + ""); + row.put(COLUMN_ACTIVE_TASK_COUNT, exec.getActiveCount() + ""); + row.put(COLUMN_TASK_QUEUE_SIZE, exec.getQueue().size() + ""); + row.put(COLUMN_COMPLETED_TASK_COUNT, exec.getCompletedTaskCount() + ""); + row.put(COLUMN_TOTAL_TASK_COUNT, exec.getTaskCount() + ""); + return row; + } + return null; + } + public static synchronized Row calculateRow(NameableExecutor exec) { long activeCount, completedTaskCount, queueSize, totalCount; final Map threadRunnableMap = DbleServer.getInstance().getRunnableMap().get(exec.getName()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java index 115bf5da15..944968b1e5 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java @@ -251,6 +251,9 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) { HELPS.put("enable @@memory_buffer_monitor", "Turn on memory buffer monitor"); HELPS.put("disable @@memory_buffer_monitor", "Turn off memory buffer monitor"); + HELPS.put("thread @@kill [name|poolname] ='?'", "Gracefully interrupt a single thread or thread pool"); + HELPS.put("thread @@recover [name|poolname] ='?'", "Restoring a single thread or thread pool"); + // list sort KEYS.addAll(HELPS.keySet()); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPool.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPool.java index 5f4a90df4b..6097cd7062 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPool.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPool.java @@ -10,15 +10,10 @@ import com.actiontech.dble.config.Fields; import com.actiontech.dble.net.mysql.*; import com.actiontech.dble.services.manager.ManagerService; -import com.actiontech.dble.util.IntegerUtil; -import com.actiontech.dble.util.LongUtil; -import com.actiontech.dble.util.NameableExecutor; -import com.actiontech.dble.util.StringUtil; +import com.actiontech.dble.util.*; import java.nio.ByteBuffer; import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; /** * ShowThreadPool status @@ -80,13 +75,20 @@ public static void execute(ManagerService service) { // write rows byte packetId = EOF.getPacketId(); - List executors = getExecutors(); - for (ExecutorService exec : executors) { - if (exec != null) { - RowDataPacket row = getRow((NameableExecutor) exec, service.getCharset().getResults()); - row.setPacketId(++packetId); - buffer = row.write(buffer, service, true); - } + + DbleServer server = DbleServer.getInstance(); + LinkedList rows = new LinkedList<>(); + rows.add(getRow((NameableExecutor) server.getTimerExecutor(), service.getCharset().getResults())); + rows.add(getRow(server.getTimerSchedulerExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getFrontExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getManagerFrontExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getBackendExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getComplexQueryExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getWriteToBackendExecutor(), service.getCharset().getResults())); + + for (RowDataPacket row : rows) { + row.setPacketId(++packetId); + buffer = row.write(buffer, service, true); } // write last eof @@ -108,18 +110,14 @@ private static RowDataPacket getRow(NameableExecutor exec, String charset) { return row; } - private static List getExecutors() { - List list = new LinkedList<>(); - DbleServer server = DbleServer.getInstance(); - list.add(server.getTimerExecutor()); - list.add(server.getFrontExecutor()); - list.add(server.getManagerFrontExecutor()); - list.add(server.getBackendExecutor()); - list.add(server.getComplexQueryExecutor()); - list.add(server.getWriteToBackendExecutor()); - // for (NIOProcessor pros : server.getProcessors()) { - // list.add(pros.getExecutor()); - // } - return list; + private static RowDataPacket getRow(NameableScheduledThreadPoolExecutor exec, String charset) { + RowDataPacket row = new RowDataPacket(FIELD_COUNT); + row.add(StringUtil.encode(exec.getName(), charset)); + row.add(IntegerUtil.toBytes(exec.getPoolSize())); + row.add(IntegerUtil.toBytes(exec.getActiveCount())); + row.add(IntegerUtil.toBytes(exec.getQueue().size())); + row.add(LongUtil.toBytes(exec.getCompletedTaskCount())); + row.add(LongUtil.toBytes(exec.getTaskCount())); + return row; } } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPoolTask.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPoolTask.java index 4d9b1e3832..0097fd3b30 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPoolTask.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowThreadPoolTask.java @@ -12,14 +12,10 @@ import com.actiontech.dble.net.mysql.*; import com.actiontech.dble.services.manager.ManagerService; import com.actiontech.dble.services.manager.information.tables.DbleThreadPoolTask; -import com.actiontech.dble.util.LongUtil; -import com.actiontech.dble.util.NameableExecutor; -import com.actiontech.dble.util.StringUtil; +import com.actiontech.dble.util.*; import java.nio.ByteBuffer; import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; /** * ShowThreadPool status @@ -82,13 +78,19 @@ public static void execute(ManagerService service) { // write rows byte packetId = EOF.getPacketId(); - List executors = getExecutors(); - for (ExecutorService exec : executors) { - if (exec != null) { - RowDataPacket row = getRow((NameableExecutor) exec, service.getCharset().getResults()); - row.setPacketId(++packetId); - buffer = row.write(buffer, service, true); - } + DbleServer server = DbleServer.getInstance(); + LinkedList rows = new LinkedList<>(); + rows.add(getRow((NameableExecutor) server.getTimerExecutor(), service.getCharset().getResults())); + rows.add(getRow(server.getTimerSchedulerExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getFrontExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getManagerFrontExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getBackendExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getComplexQueryExecutor(), service.getCharset().getResults())); + rows.add(getRow((NameableExecutor) server.getWriteToBackendExecutor(), service.getCharset().getResults())); + + for (RowDataPacket row : rows) { + row.setPacketId(++packetId); + buffer = row.write(buffer, service, true); } // write last eof @@ -111,17 +113,14 @@ private static RowDataPacket getRow(NameableExecutor exec, String charset) { return row; } - - private static List getExecutors() { - List list = new LinkedList<>(); - DbleServer server = DbleServer.getInstance(); - list.add(server.getTimerExecutor()); - list.add(server.getFrontExecutor()); - list.add(server.getManagerFrontExecutor()); - list.add(server.getBackendExecutor()); - list.add(server.getComplexQueryExecutor()); - list.add(server.getWriteToBackendExecutor()); - return list; + private static RowDataPacket getRow(NameableScheduledThreadPoolExecutor exec, String charset) { + RowDataPacket row = new RowDataPacket(FIELD_COUNT); + row.add(StringUtil.encode(exec.getName(), charset)); + row.add(IntegerUtil.toBytes(exec.getPoolSize())); + row.add(IntegerUtil.toBytes(exec.getActiveCount())); + row.add(IntegerUtil.toBytes(exec.getQueue().size())); + row.add(LongUtil.toBytes(exec.getCompletedTaskCount())); + row.add(LongUtil.toBytes(exec.getTaskCount())); + return row; } - } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java b/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java new file mode 100644 index 0000000000..bd7f261334 --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java @@ -0,0 +1,59 @@ +package com.actiontech.dble.services.manager.response; + +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.singleton.ThreadManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public final class ThreadHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class); + private static final Pattern THREAD_KILL = Pattern.compile("^\\s*@@kill\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE); + private static final Pattern THREAD_RECOVER = Pattern.compile("^\\s*@@recover\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE); + + private ThreadHandler() { + } + + public static void handle(String stmt, ManagerService service, int offset) { + String sql = stmt.substring(offset).trim(); + Matcher kill = THREAD_KILL.matcher(sql); + Matcher recover = THREAD_RECOVER.matcher(sql); + try { + if (kill.matches()) { + String type = kill.group(1); + String name = kill.group(2); + kill(service, type, name); + } else if (recover.matches()) { + String type = recover.group(1); + String name = recover.group(2); + recover(service, type, name); + } else { + service.writeErrMessage(ErrorCode.ER_YES, "Syntax Error, Please check the help to use the thread command"); + } + } catch (Exception e) { + LOGGER.info("thread command happen exception:", e); + service.writeErrMessage(ErrorCode.ER_YES, e.getMessage()); + } + } + + public static void kill(ManagerService service, String type, String name) throws Exception { + if (type.equalsIgnoreCase("name")) { + ThreadManager.interruptSingleThread(name); + } else if (type.equalsIgnoreCase("poolname")) { + ThreadManager.shutDownThreadPool(name); + } + service.writeOkPacket("Please see logs in logs/thread.log"); + } + + public static void recover(ManagerService service, String type, String name) throws Exception { + if (type.equalsIgnoreCase("name")) { + ThreadManager.recoverSingleThread(name); + } else if (type.equalsIgnoreCase("poolname")) { + ThreadManager.recoverThreadPool(name); + } + service.writeOkPacket("Please see logs in logs/thread.log"); + } +} diff --git a/src/main/java/com/actiontech/dble/singleton/Scheduler.java b/src/main/java/com/actiontech/dble/singleton/Scheduler.java index c41088eb25..a64e0bca6b 100644 --- a/src/main/java/com/actiontech/dble/singleton/Scheduler.java +++ b/src/main/java/com/actiontech/dble/singleton/Scheduler.java @@ -14,11 +14,12 @@ import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.statistic.stat.FrontActiveRatioStat; import com.actiontech.dble.statistic.stat.ThreadWorkUsage; +import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor; import com.actiontech.dble.util.TimeUtil; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Iterator; import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import static com.actiontech.dble.server.NonBlockingSession.LOGGER; @@ -31,17 +32,9 @@ public final class Scheduler { private static final long TIME_UPDATE_PERIOD = 20L; private static final long DDL_EXECUTE_CHECK_PERIOD = 60L; private static final long DEFAULT_OLD_CONNECTION_CLEAR_PERIOD = 5 * 1000L; - private static final long DEFAULT_SQL_STAT_RECYCLE_PERIOD = 5 * 1000L; - private static final int DEFAULT_CHECK_XAID = 5; - private ExecutorService timerExecutor; - private ScheduledExecutorService scheduledExecutor; - private Scheduler() { - this.scheduledExecutor = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build()); - } - - public void init(ExecutorService executor) { - this.timerExecutor = executor; + public void init() { + NameableScheduledThreadPoolExecutor scheduledExecutor = DbleServer.getInstance().getTimerSchedulerExecutor(); scheduledExecutor.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS); scheduledExecutor.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS); scheduledExecutor.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS); @@ -60,7 +53,11 @@ private Runnable printLongTimeDDL() { return new Runnable() { @Override public void run() { - DDLTraceHelper.printDDLOutOfLimit(); + try { + DDLTraceHelper.printDDLOutOfLimit(); + } catch (Throwable e) { + LOGGER.warn("scheduled task printLongTimeDDL() happen exception:{} ", e.getMessage()); + } } }; } @@ -82,7 +79,7 @@ private Runnable dbInstanceOldConsClear() { @Override public void run() { try { - timerExecutor.execute(new Runnable() { + DbleServer.getInstance().getTimerExecutor().execute(new Runnable() { @Override public void run() { @@ -102,6 +99,8 @@ public void run() { }); } catch (RejectedExecutionException e) { ThreadChecker.getInstance().timerExecuteError(e, "dbInstanceOldConsClear()"); + } catch (Throwable e) { + LOGGER.warn("scheduled task dbInstanceOldConsClear() happen exception:{} ", e.getMessage()); } } }; @@ -115,7 +114,7 @@ private Runnable oldDbGroupClear() { @Override public void run() { try { - timerExecutor.execute(() -> { + DbleServer.getInstance().getTimerExecutor().execute(() -> { Iterator iterator = IOProcessor.BACKENDS_OLD_GROUP.iterator(); while (iterator.hasNext()) { PhysicalDbGroup dbGroup = iterator.next(); @@ -128,6 +127,8 @@ public void run() { }); } catch (RejectedExecutionException e) { ThreadChecker.getInstance().timerExecuteError(e, "oldDbGroupClear()"); + } catch (Throwable e) { + LOGGER.warn("scheduled task oldDbGroupClear() happen exception:{} ", e.getMessage()); } } }; @@ -141,7 +142,7 @@ private Runnable oldDbInstanceClear() { @Override public void run() { try { - timerExecutor.execute(() -> { + DbleServer.getInstance().getTimerExecutor().execute(() -> { Iterator iterator = IOProcessor.BACKENDS_OLD_INSTANCE.iterator(); while (iterator.hasNext()) { PhysicalDbInstance dbInstance = iterator.next(); @@ -155,19 +156,20 @@ public void run() { }); } catch (RejectedExecutionException e) { ThreadChecker.getInstance().timerExecuteError(e, "oldDbInstanceClear()"); + } catch (Throwable e) { + LOGGER.warn("scheduled task oldDbInstanceClear() happen exception:{} ", e.getMessage()); } } }; } - // XA session check job private Runnable xaSessionCheck() { return new Runnable() { @Override public void run() { try { - timerExecutor.execute(new Runnable() { + DbleServer.getInstance().getTimerExecutor().execute(new Runnable() { @Override public void run() { XASessionCheck.getInstance().checkSessions(); @@ -175,6 +177,8 @@ public void run() { }); } catch (RejectedExecutionException e) { ThreadChecker.getInstance().timerExecuteError(e, "xaSessionCheck()"); + } catch (Throwable e) { + LOGGER.warn("scheduled task xaSessionCheck() happen exception:{} ", e.getMessage()); } } }; @@ -185,7 +189,7 @@ private Runnable xaLogClean() { @Override public void run() { try { - timerExecutor.execute(new Runnable() { + DbleServer.getInstance().getTimerExecutor().execute(new Runnable() { @Override public void run() { XAStateLog.cleanCompleteRecoveryLog(); @@ -193,6 +197,8 @@ public void run() { }); } catch (RejectedExecutionException e) { ThreadChecker.getInstance().timerExecuteError(e, "xaLogClean()"); + } catch (Throwable e) { + LOGGER.warn("scheduled task xaLogClean() happen exception:{} ", e.getMessage()); } } }; @@ -202,8 +208,12 @@ public Runnable threadStatRenew() { return new Runnable() { @Override public void run() { - for (ThreadWorkUsage obj : DbleServer.getInstance().getThreadUsedMap().values()) { - obj.switchToNew(); + try { + for (ThreadWorkUsage obj : DbleServer.getInstance().getThreadUsedMap().values()) { + obj.switchToNew(); + } + } catch (Throwable e) { + LOGGER.warn("scheduled task threadStatRenew() happen exception:{} ", e.getMessage()); } } }; @@ -213,19 +223,15 @@ public Runnable compressionsActiveStat() { return new Runnable() { @Override public void run() { - FrontActiveRatioStat.getInstance().compress(); + try { + FrontActiveRatioStat.getInstance().compress(); + } catch (Throwable e) { + LOGGER.warn("scheduled task compressionsActiveStat() happen exception:{} ", e.getMessage()); + } } }; } - public ExecutorService getTimerExecutor() { - return timerExecutor; - } - - public ScheduledExecutorService getScheduledExecutor() { - return scheduledExecutor; - } - public static Scheduler getInstance() { return INSTANCE; } diff --git a/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java b/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java index afa986d705..660a975433 100644 --- a/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java +++ b/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java @@ -6,6 +6,7 @@ import com.actiontech.dble.alarm.AlertUtil; import com.actiontech.dble.alarm.ToResolveContainer; import com.actiontech.dble.util.NameableExecutor; +import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,6 +144,9 @@ private LastRecordInfo getInfo(String name, Thread th, long lastExecTime, long l case TIMER_WORKER_NAME: NameableExecutor exec = (NameableExecutor) DbleServer.getInstance().getTimerExecutor(); return new LastRecordInfo(name, th.getState(), lastExecTime, lastFinishTime, exec.getActiveCount(), exec.getQueue().size(), exec.getCompletedTaskCount()); + case DbleServer.TIMER_SCHEDULER_WORKER_NAME: + NameableScheduledThreadPoolExecutor exec1 = DbleServer.getInstance().getTimerSchedulerExecutor(); + return new LastRecordInfo(name, th.getState(), lastExecTime, lastFinishTime, exec1.getActiveCount(), exec1.getQueue().size(), exec1.getCompletedTaskCount()); default: return null; } diff --git a/src/main/java/com/actiontech/dble/singleton/ThreadCheckerScheduler.java b/src/main/java/com/actiontech/dble/singleton/ThreadCheckerScheduler.java index 058424e0b9..c7a7fc6544 100644 --- a/src/main/java/com/actiontech/dble/singleton/ThreadCheckerScheduler.java +++ b/src/main/java/com/actiontech/dble/singleton/ThreadCheckerScheduler.java @@ -1,21 +1,19 @@ package com.actiontech.dble.singleton; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.actiontech.dble.util.ExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ThreadCheckerScheduler { - public static final Logger LOGGER = LoggerFactory.getLogger(ThreadCheckerScheduler.class); + public static final Logger LOGGER = LoggerFactory.getLogger("ThreadChecker"); private static final ThreadCheckerScheduler INSTANCE = new ThreadCheckerScheduler(); private ScheduledExecutorService scheduledExecutor; public ThreadCheckerScheduler() { - this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ThreadChecker-%d"). - setUncaughtExceptionHandler((Thread threads, Throwable e) -> LOGGER.warn("unknown exception ", e)).build()); + this.scheduledExecutor = ExecutorUtil.createFixedScheduled("ThreadChecker", 1, null); } public void init() { @@ -26,7 +24,11 @@ private Runnable checkThread() { return new Runnable() { @Override public void run() { - ThreadChecker.getInstance().doSelfCheck(); + try { + ThreadChecker.getInstance().doSelfCheck(); + } catch (Throwable e) { + LOGGER.warn("doSelfCheck() happen fail, exception :", e); + } } }; } diff --git a/src/main/java/com/actiontech/dble/singleton/ThreadManager.java b/src/main/java/com/actiontech/dble/singleton/ThreadManager.java new file mode 100644 index 0000000000..0ed5b27299 --- /dev/null +++ b/src/main/java/com/actiontech/dble/singleton/ThreadManager.java @@ -0,0 +1,221 @@ +package com.actiontech.dble.singleton; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.mysql.xa.XaCheckHandler; +import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.net.executor.BackendCurrentRunnable; +import com.actiontech.dble.net.executor.FrontendBlockRunnable; +import com.actiontech.dble.net.executor.FrontendCurrentRunnable; +import com.actiontech.dble.net.executor.WriteToBackendRunnable; +import com.actiontech.dble.net.impl.nio.RW; +import com.actiontech.dble.net.service.ServiceTask; +import com.actiontech.dble.util.ExecutorUtil; +import com.actiontech.dble.util.NameableExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.BlockingDeque; + +import static com.actiontech.dble.DbleServer.*; + +public final class ThreadManager { + private static final Logger LOGGER = LoggerFactory.getLogger("ThreadChecker"); + + private ThreadManager() { + } + + // single thread + public static void interruptSingleThread(String threadName) throws Exception { + Thread[] threads = getAllThread(); + Thread find = null; + for (Thread thread : threads) { + if (thread.getName().equals(threadName)) { + find = thread; + break; + } + } + if (find == null) + throw new Exception("Thread[" + threadName + "] does not exist"); + find.interrupt(); + LOGGER.info("exec interrupt Thread[{}]", find.getName()); + } + + public static void recoverSingleThread(String threadName) throws Exception { + String[] array = threadName.split("-"); + if (array.length == 2) { + DbleServer server = DbleServer.getInstance(); + switch (array[1]) { + case FRONT_WORKER_NAME: + NameableExecutor nameableExecutor0 = (NameableExecutor) server.getFrontExecutor(); + if (nameableExecutor0.getPoolSize() > nameableExecutor0.getActiveCount()) { + if (SystemConfig.getInstance().getUsePerformanceMode() == 1) { + nameableExecutor0.execute(new FrontendCurrentRunnable(server.getFrontHandlerQueue())); + } else { + nameableExecutor0.execute(new FrontendBlockRunnable((BlockingDeque) server.getFrontHandlerQueue())); + } + } else { + throw new Exception("threadPool[{" + FRONT_WORKER_NAME + "}] does not need to be recover"); + } + break; + case FRONT_MANAGER_WORKER_NAME: + NameableExecutor nameableExecutor1 = (NameableExecutor) server.getFrontExecutor(); + if (nameableExecutor1.getPoolSize() > nameableExecutor1.getActiveCount()) { + nameableExecutor1.execute(new FrontendBlockRunnable((BlockingDeque) server.getManagerFrontHandlerQueue())); + } else { + throw new Exception("threadPool[{" + FRONT_MANAGER_WORKER_NAME + "}] does not need to be recover"); + } + break; + case BACKEND_WORKER_NAME: + NameableExecutor nameableExecutor2 = (NameableExecutor) server.getBackendExecutor(); + if (nameableExecutor2.getPoolSize() > nameableExecutor2.getActiveCount()) { + if (SystemConfig.getInstance().getUsePerformanceMode() == 1) { + nameableExecutor2.execute(new BackendCurrentRunnable(server.getConcurrentBackHandlerQueue())); + } + } else { + throw new Exception("threadPool[{" + BACKEND_WORKER_NAME + "}] does not need to be recover"); + } + break; + case WRITE_TO_BACKEND_WORKER_NAME: + NameableExecutor nameableExecutor3 = (NameableExecutor) server.getWriteToBackendExecutor(); + if (nameableExecutor3.getPoolSize() > nameableExecutor3.getActiveCount()) { + nameableExecutor3.execute(new WriteToBackendRunnable(server.getWriteToBackendQueue())); + } else { + throw new Exception("threadPool[{" + WRITE_TO_BACKEND_WORKER_NAME + "}] does not need to be recover"); + } + break; + case NIO_FRONT_RW: + if (SystemConfig.getInstance().getUsingAIO() != 1) { + try { + NameableExecutor nameableExecutor4 = (NameableExecutor) server.getNioFrontExecutor(); + if (nameableExecutor4.getPoolSize() > nameableExecutor4.getActiveCount()) { + nameableExecutor4.execute(new RW(server.getFrontRegisterQueue())); + } else { + throw new Exception("threadPool[{" + NIO_FRONT_RW + "}] does not need to be recover"); + } + } catch (IOException e) { + throw new Exception("recover threadPool[{" + NIO_FRONT_RW + "}] fail", e); + } + } + break; + case NIO_BACKEND_RW: + if (SystemConfig.getInstance().getUsingAIO() != 1) { + try { + NameableExecutor nameableExecutor5 = (NameableExecutor) server.getNioBackendExecutor(); + if (nameableExecutor5.getPoolSize() > nameableExecutor5.getActiveCount()) { + nameableExecutor5.execute(new RW(server.getBackendRegisterQueue())); + } else { + throw new Exception("threadPool[{" + NIO_BACKEND_RW + "}] does not need to be recover"); + } + } catch (IOException e) { + throw new Exception("recover threadPool[{" + NIO_BACKEND_RW + "}] fail", e); + } + } + break; + default: + throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported"); + } + } else { + throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported"); + } + } + + // thread pool(TIMER_WORKER_NAME、TIMER_SCHEDULER_WORKER_NAME) + public static void shutDownThreadPool(String threadPoolName) throws Exception { + switch (threadPoolName) { + case TIMER_WORKER_NAME: + if (DbleServer.getInstance().getTimerExecutor().isShutdown()) { + throw new Exception("threadPool[" + TIMER_WORKER_NAME + "] already shutdown"); + } + LOGGER.info("manual shutdown threadPool[{}] ... start ...", TIMER_WORKER_NAME); + DbleServer.getInstance().getTimerExecutor().shutdownNow(); + LOGGER.info("manual shutdown threadPool[{}] ... end ...", TIMER_WORKER_NAME); + break; + case TIMER_SCHEDULER_WORKER_NAME: + if (DbleServer.getInstance().getTimerSchedulerExecutor().isShutdown()) { + throw new Exception("threadPool[" + TIMER_SCHEDULER_WORKER_NAME + "] already shutdown"); + } + /* + 0、shutdown + 1、stopHeartbeat + 2、stopDelayDetection + 3、stopXaIdCheckPeriod + */ + LOGGER.info("manual shutdown threadPool[{}] ... start ...", TIMER_SCHEDULER_WORKER_NAME); + DbleServer.getInstance().getTimerSchedulerExecutor().shutdownNow(); + Iterator iterator = DbleServer.getInstance().getConfig().getDbGroups().values().iterator(); + while (iterator.hasNext()) { + PhysicalDbGroup dbGroup = iterator.next(); + LOGGER.info("dbGroup[{}] stopHeartbeat...", dbGroup.getGroupName()); + dbGroup.stopHeartbeat("manual shutdown thread pool TimerScheduler"); + LOGGER.info("dbGroup[{}] stopDelayDetection...", dbGroup.getGroupName()); + dbGroup.stopDelayDetection("manual shutdown thread pool TimerScheduler"); + } + LOGGER.info("stopXaIdCheckPeriod..."); + XaCheckHandler.stopXaIdCheckPeriod(); + LOGGER.info("manual shutdown threadPool[{}] ... end ...", TIMER_SCHEDULER_WORKER_NAME); + break; + default: + throw new Exception("The shutdown operation of thread[" + TIMER_SCHEDULER_WORKER_NAME + "] is not supported"); + } + } + + public static void recoverThreadPool(String threadName) throws Exception { + switch (threadName) { + case TIMER_WORKER_NAME: + if (!DbleServer.getInstance().getTimerExecutor().isShutdown()) { + throw new Exception("threadPool[" + TIMER_WORKER_NAME + "] is not shutdown, no need to recover"); + } + LOGGER.info("manual recover threadPool[{}] ... start ...", TIMER_WORKER_NAME); + DbleServer.getInstance().setTimerExecutor( + ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance())); + LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_WORKER_NAME); + break; + case TIMER_SCHEDULER_WORKER_NAME: + if (!DbleServer.getInstance().getTimerSchedulerExecutor().isShutdown()) { + throw new Exception("threadPool[" + TIMER_SCHEDULER_WORKER_NAME + "] is not shutdown, no need to recover"); + } + /* + 0、new TimerSchedulerExecutor AND init + 1、startHeartbeat + 2、startDelayDetection + 3、startXaIdCheckPeriod + */ + LOGGER.info("manual recover threadPool[{}] ... start ...", TIMER_SCHEDULER_WORKER_NAME); + DbleServer.getInstance().setTimerSchedulerExecutor( + ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance())); + Scheduler.getInstance().init(); + Iterator iterator = DbleServer.getInstance().getConfig().getDbGroups().values().iterator(); + while (iterator.hasNext()) { + PhysicalDbGroup dbGroup = iterator.next(); + LOGGER.info("dbGroup[{}] startHeartbeat...", dbGroup.getGroupName()); + dbGroup.startHeartbeat(); + LOGGER.info("dbGroup[{}] startDelayDetection...", dbGroup.getGroupName()); + dbGroup.startDelayDetection(); + } + LOGGER.info("startXaIdCheckPeriod..."); + XaCheckHandler.startXaIdCheckPeriod(); + LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_SCHEDULER_WORKER_NAME); + break; + default: + throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported"); + } + } + + private static Thread[] getAllThread() { + ThreadGroup group = Thread.currentThread().getThreadGroup(); + ThreadGroup topGroup = group; + while (group != null) { + topGroup = group; + group = group.getParent(); + } + int slackSize = topGroup.activeCount() * 2; + Thread[] slackThreads = new Thread[slackSize]; + int actualSize = topGroup.enumerate(slackThreads); + Thread[] atualThreads = new Thread[actualSize]; + System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize); + return atualThreads; + } +} diff --git a/src/main/java/com/actiontech/dble/util/ExecutorUtil.java b/src/main/java/com/actiontech/dble/util/ExecutorUtil.java index 95f58f5d5d..87d007af89 100644 --- a/src/main/java/com/actiontech/dble/util/ExecutorUtil.java +++ b/src/main/java/com/actiontech/dble/util/ExecutorUtil.java @@ -6,6 +6,7 @@ package com.actiontech.dble.util; import com.actiontech.dble.singleton.ThreadChecker; + import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; @@ -65,6 +66,11 @@ private static NameableExecutor createCached(String name, int size, boolean isDa */ public static NameableExecutor createCached(String name, int size, int maxSize, ThreadChecker checker) { NameableThreadFactory factory = new NameableThreadFactory(name, true); - return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(256), factory, null, checker); + return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(1024), factory, null, checker); + } + + public static NameableScheduledThreadPoolExecutor createFixedScheduled(String name, int size, ThreadChecker checker) { + NameableThreadFactory factory = new NameableThreadFactory(name, true); + return new NameableScheduledThreadPoolExecutor(name, size, factory, checker); } } diff --git a/src/main/java/com/actiontech/dble/util/NameableScheduledThreadPoolExecutor.java b/src/main/java/com/actiontech/dble/util/NameableScheduledThreadPoolExecutor.java new file mode 100644 index 0000000000..3faea76529 --- /dev/null +++ b/src/main/java/com/actiontech/dble/util/NameableScheduledThreadPoolExecutor.java @@ -0,0 +1,46 @@ +package com.actiontech.dble.util; + +import com.actiontech.dble.singleton.ThreadChecker; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +public class NameableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + + protected String name; + private ThreadChecker threadChecker = null; + + public NameableScheduledThreadPoolExecutor(String name, int corePoolSize, ThreadFactory threadFactory, ThreadChecker threadChecker) { + super(corePoolSize, threadFactory); + this.name = name; + this.threadChecker = threadChecker; + } + + public String getName() { + return name; + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + if (threadChecker != null) { + threadChecker.startExec(t); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (threadChecker != null) { + threadChecker.endExec(); + } + } + + @Override + protected void terminated() { + super.terminated(); + if (threadChecker != null) { + threadChecker.terminated(); + } + } +}