Skip to content

Commit

Permalink
[inner-2112] implement interrupt thread through interrupt()/shutdownN…
Browse files Browse the repository at this point in the history
…ow()
  • Loading branch information
wenyh1 committed Sep 7, 2023
1 parent 3ef40de commit 94b705f
Show file tree
Hide file tree
Showing 29 changed files with 610 additions and 122 deletions.
24 changes: 21 additions & 3 deletions src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import com.actiontech.dble.singleton.*;
import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import com.actiontech.dble.singleton.ThreadChecker;
import com.actiontech.dble.util.ExecutorUtil;
import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,6 +77,7 @@ public final class DbleServer {
public static final String WRITE_TO_BACKEND_WORKER_NAME = "writeToBackendWorker";
public static final String COMPLEX_QUERY_EXECUTOR_NAME = "complexQueryWorker";
public static final String TIMER_WORKER_NAME = "Timer";
public static final String TIMER_SCHEDULER_WORKER_NAME = "TimerScheduler";
public static final String NIO_FRONT_RW = "NIOFrontRW";
public static final String NIO_BACKEND_RW = "NIOBackendRW";
public static final String AIO_EXECUTOR_NAME = "AIO";
Expand Down Expand Up @@ -113,7 +114,8 @@ public static DbleServer getInstance() {
private ExecutorService backendExecutor;
private ExecutorService writeToBackendExecutor;
private ExecutorService complexQueryExecutor;
private ExecutorService timerExecutor;
private volatile ExecutorService timerExecutor;
private volatile NameableScheduledThreadPoolExecutor timerSchedulerExecutor;
private Map<String, ThreadWorkUsage> threadUsedMap = new ConcurrentHashMap<>();

private Deque<ServiceTask> frontHandlerQueue;
Expand Down Expand Up @@ -295,7 +297,7 @@ public void startup() throws Exception {
LOGGER.info("=====================================Server started success=======================================");

ThreadCheckerScheduler.getInstance().init();
Scheduler.getInstance().init(timerExecutor);
Scheduler.getInstance().init();
LOGGER.info("=======================================Scheduler started==========================================");

XaCheckHandler.initXaIdCheckPeriod();
Expand Down Expand Up @@ -340,6 +342,7 @@ private void initExecutor(int frontProcessorCount, int backendProcessorCount) {
writeToBackendExecutor = ExecutorUtil.createFixed(WRITE_TO_BACKEND_WORKER_NAME, SystemConfig.getInstance().getWriteToBackendWorker(), runnableMap);
complexQueryExecutor = ExecutorUtil.createCached(COMPLEX_QUERY_EXECUTOR_NAME, SystemConfig.getInstance().getComplexQueryWorker(), null);
timerExecutor = ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance());
timerSchedulerExecutor = ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance());
nioFrontExecutor = ExecutorUtil.createFixed(NIO_FRONT_RW, frontProcessorCount, runnableMap);
nioBackendExecutor = ExecutorUtil.createFixed(NIO_BACKEND_RW, backendProcessorCount, runnableMap);
}
Expand Down Expand Up @@ -488,6 +491,10 @@ public void run() {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "processorCheck()");
} catch (Throwable e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("scheduled task processorCheck() happen exception: ", e);
}
}
}
};
Expand Down Expand Up @@ -621,6 +628,17 @@ public ExecutorService getTimerExecutor() {
return timerExecutor;
}

public NameableScheduledThreadPoolExecutor getTimerSchedulerExecutor() {
return timerSchedulerExecutor;
}

public void setTimerExecutor(ExecutorService timerExecutor) {
this.timerExecutor = timerExecutor;
}

public void setTimerSchedulerExecutor(NameableScheduledThreadPoolExecutor timerSchedulerExecutor) {
this.timerSchedulerExecutor = timerSchedulerExecutor;
}

public ExecutorService getComplexQueryExecutor() {
return complexQueryExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.actiontech.dble.backend.delyDetection;

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
Expand All @@ -13,7 +14,6 @@
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.singleton.Scheduler;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
Expand Down Expand Up @@ -105,7 +105,7 @@ public void start(long initialDelay) {
//avoid concurrency with the master
initialDelay = initialDelay >> 1;
}
this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> execute(),
this.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleAtFixedRate(() -> execute(),
initialDelay, this.delayPeriodMillis, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.singleton.Scheduler;
import com.actiontech.dble.statistic.DbInstanceSyncRecorder;
import com.actiontech.dble.statistic.HeartbeatRecorder;
import com.actiontech.dble.util.TimeUtil;
Expand Down Expand Up @@ -94,7 +93,7 @@ public void start(long heartbeatPeriodMillis) {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("start heartbeat :{}", this.toString());
}
this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> {
this.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleAtFixedRate(() -> {
if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ private void ownThreadJob() {
if (!itemToDiscard.isNullItem()) {
BlockingQueue<HeapItem> discardQueue = queues.get(itemToDiscard.getIndex());
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (discardQueue.take().isNullItem() || isFail()) {
break;
}
Expand All @@ -245,15 +248,7 @@ private void ownThreadJob() {
nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
}
}
Iterator<Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>>> iterator = this.queues.entrySet().iterator();
MySQLResponseService service = null;
while (iterator.hasNext()) {
Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>> entry = iterator.next();
service = entry.getKey();
entry.getValue().clear();
session.releaseConnectionIfSafe(entry.getKey(), false);
iterator.remove();
}
MySQLResponseService service = clearQueueAndReleaseConnection();
session.trace(t -> t.doSqlStat(selectRows, netOutBytes.intValue(), resultSize.intValue()));
assert service != null;
if (!isFail()) {
Expand All @@ -269,4 +264,17 @@ private void ownThreadJob() {
session.onQueryError(msg.getBytes());
}
}

private MySQLResponseService clearQueueAndReleaseConnection() {
Iterator<Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>>> iterator = this.queues.entrySet().iterator();
MySQLResponseService service = null;
while (iterator.hasNext()) {
Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>> entry = iterator.next();
service = entry.getKey();
entry.getValue().clear();
session.releaseConnectionIfSafe(entry.getKey(), false);
iterator.remove();
}
return service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ protected void ownThreadJob(Object... objects) {
if (!itemToDiscard.isNullItem()) {
BlockingQueue<HeapItem> discardQueue = queues.get(itemToDiscard.getIndex());
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (discardQueue.take().isNullItem() || terminate.get()) {
break;
}
Expand Down Expand Up @@ -238,6 +241,10 @@ protected void recycleResources() {
Entry<MySQLResponseService, BlockingQueue<HeapItem>> entry = iterator.next();
// fair lock queue,poll for clear
while (true) {
if (Thread.currentThread().isInterrupted()) {
LOGGER.info("manual interrupted");
break;
}
if (entry.getValue().poll() == null) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ protected void ownThreadJob(Object... objects) {
recordElapsedTime("order writeDirectly start :");
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (terminate.get()) {
return;
}
Expand All @@ -110,13 +113,16 @@ protected void ownThreadJob(Object... objects) {
}
localResult.add(row);
} catch (InterruptedException e) {
//ignore error
throw e;
}
}
recordElapsedTime("order writeDirectly end :");
localResult.done();
recordElapsedTime("order read start :");
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (terminate.get()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ protected void ownThreadJob(Object... objects) {
try {
int eofCount = 0;
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
RowDataPacket row = outQueue.take();
if (row.getFieldCount() == 0) {
eofCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ protected void ownThreadJob(Object... objects) {
leftLocal = takeFirst(leftQueue);
rightLocal = takeFirst(rightQueue);
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (terminate.get())
return;
RowDataPacket leftRow = leftLocal.getLastRow();
Expand Down Expand Up @@ -228,15 +231,7 @@ protected void ownThreadJob(Object... objects) {
if (!nestLoopDependOn) {
HandlerTool.terminateHandlerTree(this);
}
// for trace, when join end before all rows return ,the handler should mark as finished
for (DMLResponseHandler mergeHandler : this.getMerges()) {
DMLResponseHandler handler = mergeHandler;
while (handler != null && handler != this) {
session.setHandlerEnd(handler);
handler = handler.getNextHandler();
}
}
session.setHandlerEnd(this);
makeFinishedOfHandler();
nextHandler.rowEofResponse(null, isLeft, service);
} catch (MySQLOutPutException e) {
String msg = e.getLocalizedMessage();
Expand All @@ -254,6 +249,18 @@ protected void ownThreadJob(Object... objects) {
}
}

// for trace, when join end before all rows return ,the handler should mark as finished
private void makeFinishedOfHandler() {
for (DMLResponseHandler mergeHandler : this.getMerges()) {
DMLResponseHandler handler = mergeHandler;
while (handler != null && handler != this) {
session.setHandlerEnd(handler);
handler = handler.getNextHandler();
}
}
session.setHandlerEnd(this);
}

private LocalResult takeFirst(FairLinkedBlockingDeque<LocalResult> deque) throws InterruptedException {
/**
* it must be in single thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ protected void ownThreadJob(Object... objects) {
leftLocal = takeFirst(leftQueue);
rightLocal = takeFirst(rightQueue);
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
RowDataPacket leftRow = leftLocal.getLastRow();
RowDataPacket rightRow = rightLocal.getLastRow();
if (leftRow.getFieldCount() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
import com.actiontech.dble.singleton.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -168,18 +167,18 @@ public static void adjustXaIdCheckPeriod(long period0) {
}
}

private static void startXaIdCheckPeriod() {
public static void startXaIdCheckPeriod() {
synchronized (INSTANCE) {
if (INSTANCE.xaIdCheckPeriod > 0) {
INSTANCE.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleWithFixedDelay(() -> {
INSTANCE.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleWithFixedDelay(() -> {
(new XAAnalysisHandler()).checkResidualTask();
}, 0, INSTANCE.xaIdCheckPeriod, TimeUnit.SECONDS);
LOGGER.info("====================================Start XaIdCheckPeriod[{}]=========================================", INSTANCE.xaIdCheckPeriod);
}
}
}

private static void stopXaIdCheckPeriod() {
public static void stopXaIdCheckPeriod() {
synchronized (INSTANCE) {
ScheduledFuture future = INSTANCE.scheduledFuture;
if (future != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public String waitingForAllTheNode(String path) {
Map<String, OnlineType> expectedMap = ClusterHelper.getOnlineMap();
StringBuilder errorMsg = new StringBuilder();
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
errorMsg.append("manual interrupted");
break;
}
errorMsg.setLength(0);
if (checkResponseForOneTime(path, expectedMap, errorMsg)) {
break;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ public TableMeta getSyncTableMeta(String schema, String tbName) throws SQLNonTra
TraceManager.log(ImmutableMap.of("schema", schema, "table", tbName), traceObject);
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new SQLNonTransientException("manual interrupted");
}
int oldVersion = version.get();
if (metaCount.get() == 0) {
TableMeta meta = getTableMeta(schema, tbName);
Expand Down Expand Up @@ -248,6 +251,9 @@ public PlanNode getSyncView(String schema, String vName) throws SQLNonTransientE
TraceManager.log(ImmutableMap.of("schema", schema, "table", vName), traceObject);
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new SQLNonTransientException("manual interrupted");
}
int oldVersion = version.get();
if (catalogs.get(schema) == null) {
return null;
Expand Down
Loading

0 comments on commit 94b705f

Please sign in to comment.