From e29ec29686fbca23d68975c3309b0a68da7edf57 Mon Sep 17 00:00:00 2001 From: wenyh <44251917+wenyh1@users.noreply.github.com> Date: Thu, 7 Sep 2023 14:17:10 +0800 Subject: [PATCH] [inner-2351] print thread stackTrace --- .../java/com/actiontech/dble/DbleServer.java | 2 +- .../services/manager/response/ShowHelp.java | 1 + .../manager/response/ThreadHandler.java | 15 ++++- .../dble/singleton/SerializableLock.java | 2 +- .../dble/singleton/ThreadChecker.java | 17 +----- .../dble/singleton/ThreadManager.java | 58 ++++++++++++++++++- .../actiontech/dble/util/ExecutorUtil.java | 6 +- 7 files changed, 78 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/actiontech/dble/DbleServer.java b/src/main/java/com/actiontech/dble/DbleServer.java index cf615b2c19..65b6ecd39f 100644 --- a/src/main/java/com/actiontech/dble/DbleServer.java +++ b/src/main/java/com/actiontech/dble/DbleServer.java @@ -341,7 +341,7 @@ private void initExecutor(int frontProcessorCount, int backendProcessorCount) { backendExecutor = ExecutorUtil.createFixed(BACKEND_WORKER_NAME, SystemConfig.getInstance().getBackendWorker(), runnableMap); writeToBackendExecutor = ExecutorUtil.createFixed(WRITE_TO_BACKEND_WORKER_NAME, SystemConfig.getInstance().getWriteToBackendWorker(), runnableMap); complexQueryExecutor = ExecutorUtil.createCached(COMPLEX_QUERY_EXECUTOR_NAME, SystemConfig.getInstance().getComplexQueryWorker(), null); - timerExecutor = ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance()); + timerExecutor = ExecutorUtil.createTimer(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance()); timerSchedulerExecutor = ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance()); nioFrontExecutor = ExecutorUtil.createFixed(NIO_FRONT_RW, frontProcessorCount, runnableMap); nioBackendExecutor = ExecutorUtil.createFixed(NIO_BACKEND_RW, backendProcessorCount, runnableMap); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java index 944968b1e5..6efd06c95a 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java @@ -253,6 +253,7 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) { HELPS.put("thread @@kill [name|poolname] ='?'", "Gracefully interrupt a single thread or thread pool"); HELPS.put("thread @@recover [name|poolname] ='?'", "Restoring a single thread or thread pool"); + HELPS.put("thread @@print [name ='?']", "Print the status and stack of a single or all threads"); // list sort KEYS.addAll(HELPS.keySet()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java b/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java index bd7f261334..a4751ae1fd 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ThreadHandler.java @@ -11,6 +11,7 @@ public final class ThreadHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class); + private static final Pattern THREAD_PRINT = Pattern.compile("^\\s*@@print\\s*(name\\s*=\\s*'([a-zA-Z_0-9\\-]+)')?$", Pattern.CASE_INSENSITIVE); private static final Pattern THREAD_KILL = Pattern.compile("^\\s*@@kill\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE); private static final Pattern THREAD_RECOVER = Pattern.compile("^\\s*@@recover\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE); @@ -19,10 +20,13 @@ private ThreadHandler() { public static void handle(String stmt, ManagerService service, int offset) { String sql = stmt.substring(offset).trim(); + Matcher print = THREAD_PRINT.matcher(sql); Matcher kill = THREAD_KILL.matcher(sql); Matcher recover = THREAD_RECOVER.matcher(sql); try { - if (kill.matches()) { + if (print.matches()) { + printTread(service, print.group(2)); + } else if (kill.matches()) { String type = kill.group(1); String name = kill.group(2); kill(service, type, name); @@ -39,6 +43,15 @@ public static void handle(String stmt, ManagerService service, int offset) { } } + public static void printTread(ManagerService service, String name) throws Exception { + if (name == null) { + ThreadManager.printAll(); + } else { + ThreadManager.printSingleThread(name); + } + service.writeOkPacket("Please see logs in logs/thread.log"); + } + public static void kill(ManagerService service, String type, String name) throws Exception { if (type.equalsIgnoreCase("name")) { ThreadManager.interruptSingleThread(name); diff --git a/src/main/java/com/actiontech/dble/singleton/SerializableLock.java b/src/main/java/com/actiontech/dble/singleton/SerializableLock.java index eaf9020a28..a51cb96d5f 100644 --- a/src/main/java/com/actiontech/dble/singleton/SerializableLock.java +++ b/src/main/java/com/actiontech/dble/singleton/SerializableLock.java @@ -58,7 +58,7 @@ public void unLock(long frontId) { } private String printTrace() { - StackTraceElement[] st = Thread.currentThread().getStackTrace(); + StackTraceElement[] st = Thread.currentThread().getStackTrace(); // here is the currentThread, STW won't happen StringBuilder sbf = new StringBuilder(); for (StackTraceElement e : st) { if (sbf.length() > 0) { diff --git a/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java b/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java index 660a975433..26fdbf61c1 100644 --- a/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java +++ b/src/main/java/com/actiontech/dble/singleton/ThreadChecker.java @@ -71,7 +71,7 @@ private void doSelfCheck0(Thread th, TimeRecord tr) { long timeDiff = nowTime - current.getStartTime(); if (timeDiff > checkTimeNs) { // more than 10s will log String msg = "Thread[" + key + "] suspected hang, execute time:[{" + timeDiff / 1000000L + "ms}] more than 10s, currentState:[" + current.getState() + "]"; - LOGGER.info(msg + ", stackTrace: {}", getStackTrace(key)); + LOGGER.info(msg); // if there is task accumulation in the queue, it means that all threads are hang if (previous.getCompletedTask() == current.getCompletedTask() && current.getActiveTaskCount() == previous.getActiveTaskCount()) { LOGGER.info("The thread pool where the thread[" + key + "] is located is in the hang state and cannot work. Trigger alarm"); @@ -122,21 +122,6 @@ public void timerExecuteError(RejectedExecutionException exception, String metho } } - private String getStackTrace(String threadName) { - for (Map.Entry entry : timeRecords.entrySet()) { - StackTraceElement[] st = entry.getKey().getStackTrace(); - if (threadName.equals(entry.getKey().getName())) { - StringBuilder sbf = new StringBuilder(); - for (StackTraceElement e : st) { - sbf.append("\n\tat "); - sbf.append(e); - } - return sbf.toString(); - } - } - return "empty"; - } - private LastRecordInfo getInfo(String name, Thread th, long lastExecTime, long lastFinishTime) { String[] arr = name.split("-"); if (arr.length == 2) { diff --git a/src/main/java/com/actiontech/dble/singleton/ThreadManager.java b/src/main/java/com/actiontech/dble/singleton/ThreadManager.java index 0ed5b27299..b40484e7c4 100644 --- a/src/main/java/com/actiontech/dble/singleton/ThreadManager.java +++ b/src/main/java/com/actiontech/dble/singleton/ThreadManager.java @@ -170,7 +170,7 @@ public static void recoverThreadPool(String threadName) throws Exception { } LOGGER.info("manual recover threadPool[{}] ... start ...", TIMER_WORKER_NAME); DbleServer.getInstance().setTimerExecutor( - ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance())); + ExecutorUtil.createTimer(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance())); LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_WORKER_NAME); break; case TIMER_SCHEDULER_WORKER_NAME: @@ -204,6 +204,62 @@ public static void recoverThreadPool(String threadName) throws Exception { } } + public static void printAll() { + Thread[] threads = getAllThread(); + StringBuilder sbf = new StringBuilder(); + sbf.append("============== select all thread ============== start"); + for (Thread thread : threads) { + sbf.append("\n \""); + sbf.append(thread.getName()); + sbf.append("\" "); + sbf.append("#"); + sbf.append(thread.getId()); + sbf.append(", state: "); + sbf.append(thread.getState()); + sbf.append(", stackTrace: "); + StackTraceElement[] st = thread.getStackTrace(); + for (StackTraceElement e : st) { + sbf.append("\n\tat "); + sbf.append(e); + } + } + sbf.append("\n============== select all thread ============== end"); + LOGGER.info(sbf.toString()); + } + + public static void printSingleThread(String threadName) throws Exception { + if (threadName != null && threadName.length() > 0) { + Thread[] threads = getAllThread(); + StringBuilder sbf = null; + for (Thread thread : threads) { + if (thread.getName().equals(threadName)) { + sbf = new StringBuilder(); + sbf.append("============== select thread[{" + threadName + "}] ============== start"); + sbf.append("\n \""); + sbf.append(thread.getName()); + sbf.append("\" "); + sbf.append("#"); + sbf.append(thread.getId()); + sbf.append(", state: "); + sbf.append(thread.getState()); + sbf.append(", stackTrace: "); + StackTraceElement[] st = thread.getStackTrace(); + for (StackTraceElement e : st) { + sbf.append("\n\tat "); + sbf.append(e); + } + sbf.append("\n============== select thread[{" + threadName + "}] ============== end"); + break; + } + } + if (sbf == null) { + throw new Exception("Thread[" + threadName + "] does not exist"); + } else { + LOGGER.info(sbf.toString()); + } + } + } + private static Thread[] getAllThread() { ThreadGroup group = Thread.currentThread().getThreadGroup(); ThreadGroup topGroup = group; diff --git a/src/main/java/com/actiontech/dble/util/ExecutorUtil.java b/src/main/java/com/actiontech/dble/util/ExecutorUtil.java index 87d007af89..36953e4e85 100644 --- a/src/main/java/com/actiontech/dble/util/ExecutorUtil.java +++ b/src/main/java/com/actiontech/dble/util/ExecutorUtil.java @@ -61,12 +61,12 @@ private static NameableExecutor createCached(String name, int size, boolean isDa * When the size number of threads takes a long time to execute or hangs, and the queue is full, * create a new thread to continue working, and the number of newly created threads does not exceed maxSize *

- * If maxSize threads are all hanging, only 1024 tasks will be stored in the queue (this avoids memory leaks), + * If maxSize threads are all hanging, only 65535 tasks will be stored in the queue (this avoids memory leaks), * and subsequent tasks will be discarded by default */ - public static NameableExecutor createCached(String name, int size, int maxSize, ThreadChecker checker) { + public static NameableExecutor createTimer(String name, int size, int maxSize, ThreadChecker checker) { NameableThreadFactory factory = new NameableThreadFactory(name, true); - return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(1024), factory, null, checker); + return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(65535), factory, null, checker); } public static NameableScheduledThreadPoolExecutor createFixedScheduled(String name, int size, ThreadChecker checker) {