Skip to content

Commit

Permalink
[inner-2351] print thread stackTrace
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyh1 committed Sep 11, 2023
1 parent bc48cc3 commit e29ec29
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private void initExecutor(int frontProcessorCount, int backendProcessorCount) {
backendExecutor = ExecutorUtil.createFixed(BACKEND_WORKER_NAME, SystemConfig.getInstance().getBackendWorker(), runnableMap);
writeToBackendExecutor = ExecutorUtil.createFixed(WRITE_TO_BACKEND_WORKER_NAME, SystemConfig.getInstance().getWriteToBackendWorker(), runnableMap);
complexQueryExecutor = ExecutorUtil.createCached(COMPLEX_QUERY_EXECUTOR_NAME, SystemConfig.getInstance().getComplexQueryWorker(), null);
timerExecutor = ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance());
timerExecutor = ExecutorUtil.createTimer(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance());
timerSchedulerExecutor = ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance());
nioFrontExecutor = ExecutorUtil.createFixed(NIO_FRONT_RW, frontProcessorCount, runnableMap);
nioBackendExecutor = ExecutorUtil.createFixed(NIO_BACKEND_RW, backendProcessorCount, runnableMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ private static RowDataPacket getRow(String stmt, String desc, String charset) {

HELPS.put("thread @@kill [name|poolname] ='?'", "Gracefully interrupt a single thread or thread pool");
HELPS.put("thread @@recover [name|poolname] ='?'", "Restoring a single thread or thread pool");
HELPS.put("thread @@print [name ='?']", "Print the status and stack of a single or all threads");

// list sort
KEYS.addAll(HELPS.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

public final class ThreadHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
private static final Pattern THREAD_PRINT = Pattern.compile("^\\s*@@print\\s*(name\\s*=\\s*'([a-zA-Z_0-9\\-]+)')?$", Pattern.CASE_INSENSITIVE);
private static final Pattern THREAD_KILL = Pattern.compile("^\\s*@@kill\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE);
private static final Pattern THREAD_RECOVER = Pattern.compile("^\\s*@@recover\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE);

Expand All @@ -19,10 +20,13 @@ private ThreadHandler() {

public static void handle(String stmt, ManagerService service, int offset) {
String sql = stmt.substring(offset).trim();
Matcher print = THREAD_PRINT.matcher(sql);
Matcher kill = THREAD_KILL.matcher(sql);
Matcher recover = THREAD_RECOVER.matcher(sql);
try {
if (kill.matches()) {
if (print.matches()) {
printTread(service, print.group(2));
} else if (kill.matches()) {
String type = kill.group(1);
String name = kill.group(2);
kill(service, type, name);
Expand All @@ -39,6 +43,15 @@ public static void handle(String stmt, ManagerService service, int offset) {
}
}

public static void printTread(ManagerService service, String name) throws Exception {
if (name == null) {
ThreadManager.printAll();
} else {
ThreadManager.printSingleThread(name);
}
service.writeOkPacket("Please see logs in logs/thread.log");
}

public static void kill(ManagerService service, String type, String name) throws Exception {
if (type.equalsIgnoreCase("name")) {
ThreadManager.interruptSingleThread(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void unLock(long frontId) {
}

private String printTrace() {
StackTraceElement[] st = Thread.currentThread().getStackTrace();
StackTraceElement[] st = Thread.currentThread().getStackTrace(); // here is the currentThread, STW won't happen
StringBuilder sbf = new StringBuilder();
for (StackTraceElement e : st) {
if (sbf.length() > 0) {
Expand Down
17 changes: 1 addition & 16 deletions src/main/java/com/actiontech/dble/singleton/ThreadChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private void doSelfCheck0(Thread th, TimeRecord tr) {
long timeDiff = nowTime - current.getStartTime();
if (timeDiff > checkTimeNs) { // more than 10s will log
String msg = "Thread[" + key + "] suspected hang, execute time:[{" + timeDiff / 1000000L + "ms}] more than 10s, currentState:[" + current.getState() + "]";
LOGGER.info(msg + ", stackTrace: {}", getStackTrace(key));
LOGGER.info(msg);
// if there is task accumulation in the queue, it means that all threads are hang
if (previous.getCompletedTask() == current.getCompletedTask() && current.getActiveTaskCount() == previous.getActiveTaskCount()) {
LOGGER.info("The thread pool where the thread[" + key + "] is located is in the hang state and cannot work. Trigger alarm");
Expand Down Expand Up @@ -122,21 +122,6 @@ public void timerExecuteError(RejectedExecutionException exception, String metho
}
}

private String getStackTrace(String threadName) {
for (Map.Entry<Thread, TimeRecord> entry : timeRecords.entrySet()) {
StackTraceElement[] st = entry.getKey().getStackTrace();
if (threadName.equals(entry.getKey().getName())) {
StringBuilder sbf = new StringBuilder();
for (StackTraceElement e : st) {
sbf.append("\n\tat ");
sbf.append(e);
}
return sbf.toString();
}
}
return "empty";
}

private LastRecordInfo getInfo(String name, Thread th, long lastExecTime, long lastFinishTime) {
String[] arr = name.split("-");
if (arr.length == 2) {
Expand Down
58 changes: 57 additions & 1 deletion src/main/java/com/actiontech/dble/singleton/ThreadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static void recoverThreadPool(String threadName) throws Exception {
}
LOGGER.info("manual recover threadPool[{}] ... start ...", TIMER_WORKER_NAME);
DbleServer.getInstance().setTimerExecutor(
ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance()));
ExecutorUtil.createTimer(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance()));
LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_WORKER_NAME);
break;
case TIMER_SCHEDULER_WORKER_NAME:
Expand Down Expand Up @@ -204,6 +204,62 @@ public static void recoverThreadPool(String threadName) throws Exception {
}
}

public static void printAll() {
Thread[] threads = getAllThread();
StringBuilder sbf = new StringBuilder();
sbf.append("============== select all thread ============== start");
for (Thread thread : threads) {
sbf.append("\n \"");
sbf.append(thread.getName());
sbf.append("\" ");
sbf.append("#");
sbf.append(thread.getId());
sbf.append(", state: ");
sbf.append(thread.getState());
sbf.append(", stackTrace: ");
StackTraceElement[] st = thread.getStackTrace();
for (StackTraceElement e : st) {
sbf.append("\n\tat ");
sbf.append(e);
}
}
sbf.append("\n============== select all thread ============== end");
LOGGER.info(sbf.toString());
}

public static void printSingleThread(String threadName) throws Exception {
if (threadName != null && threadName.length() > 0) {
Thread[] threads = getAllThread();
StringBuilder sbf = null;
for (Thread thread : threads) {
if (thread.getName().equals(threadName)) {
sbf = new StringBuilder();
sbf.append("============== select thread[{" + threadName + "}] ============== start");
sbf.append("\n \"");
sbf.append(thread.getName());
sbf.append("\" ");
sbf.append("#");
sbf.append(thread.getId());
sbf.append(", state: ");
sbf.append(thread.getState());
sbf.append(", stackTrace: ");
StackTraceElement[] st = thread.getStackTrace();
for (StackTraceElement e : st) {
sbf.append("\n\tat ");
sbf.append(e);
}
sbf.append("\n============== select thread[{" + threadName + "}] ============== end");
break;
}
}
if (sbf == null) {
throw new Exception("Thread[" + threadName + "] does not exist");
} else {
LOGGER.info(sbf.toString());
}
}
}

private static Thread[] getAllThread() {
ThreadGroup group = Thread.currentThread().getThreadGroup();
ThreadGroup topGroup = group;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/actiontech/dble/util/ExecutorUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ private static NameableExecutor createCached(String name, int size, boolean isDa
* When the size number of threads takes a long time to execute or hangs, and the queue is full,
* create a new thread to continue working, and the number of newly created threads does not exceed maxSize
* <p>
* If maxSize threads are all hanging, only 1024 tasks will be stored in the queue (this avoids memory leaks),
* If maxSize threads are all hanging, only 65535 tasks will be stored in the queue (this avoids memory leaks),
* and subsequent tasks will be discarded by default
*/
public static NameableExecutor createCached(String name, int size, int maxSize, ThreadChecker checker) {
public static NameableExecutor createTimer(String name, int size, int maxSize, ThreadChecker checker) {
NameableThreadFactory factory = new NameableThreadFactory(name, true);
return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(1024), factory, null, checker);
return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(65535), factory, null, checker);
}

public static NameableScheduledThreadPoolExecutor createFixedScheduled(String name, int size, ThreadChecker checker) {
Expand Down

0 comments on commit e29ec29

Please sign in to comment.