diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/BaseDDLHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/BaseDDLHandler.java index ef9b2325f8..7439c57b35 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/BaseDDLHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/BaseDDLHandler.java @@ -32,9 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; @@ -67,6 +65,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl protected final ReentrantLock lock = new ReentrantLock(); protected HashMap nodeResponseStatus = Maps.newHashMap(); + protected Set closedConnSet = new HashSet<>(1); protected AtomicBoolean writeToClientFlag = new AtomicBoolean(false); protected AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once protected volatile String errMsg; @@ -294,7 +293,7 @@ public void connectionClose(@NotNull AbstractService service, String closeReason MySQLResponseService responseService = (MySQLResponseService) service; final RouteResultsetNode node = (RouteResultsetNode) responseService.getAttachment(); - if (checkIsAlreadyClosed(node)) return; + if (checkIsAlreadyClosed(node, responseService)) return; LOGGER.warn("backend connect {}, conn info:{}", closeReason0, service); DDLTraceHelper.log(session.getShardingService(), d -> d.infoByNode(node.getName(), stage, DDLTraceHelper.Status.fail, closeReason0)); @@ -333,11 +332,15 @@ public void executeFail(String errInfo) { } } - protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) { + protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) { lock.lock(); try { - if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true; - nodeResponseStatus.put(node, STATUS_CONN_CLOSE); + if (closedConnSet.contains(mysqlResponseService)) { + nodeResponseStatus.put(node, STATUS_CONN_CLOSE); + return true; + } else { + closedConnSet.add(mysqlResponseService); + } session.getTargetMap().remove(node); return false; } finally { @@ -375,6 +378,7 @@ protected void setErrPkg(String errMsg0, int errorCode0) { protected void clearResources() { nodeResponseStatus.clear(); + closedConnSet.clear(); } protected void handleEndPacket(MySQLPacket packet) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/MultiNodeDdlPrepareHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/MultiNodeDdlPrepareHandler.java index f777e76c1a..97bf4fb16b 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/MultiNodeDdlPrepareHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/ddl/MultiNodeDdlPrepareHandler.java @@ -110,13 +110,16 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, @NotNull AbstractSe } @Override - protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) { + protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) { lock.lock(); try { if (finishedTest) return true; - if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true; - nodeResponseStatus.put(node, STATUS_CONN_CLOSE); - session.getTargetMap().remove(node); + if (closedConnSet.contains(mysqlResponseService)) { + nodeResponseStatus.put(node, STATUS_CONN_CLOSE); + return true; + } else { + closedConnSet.add(mysqlResponseService); + } return false; } finally { lock.unlock();