Skip to content

Commit

Permalink
[inner-2368] fix: when some nodes have been closed during the ddl exe…
Browse files Browse the repository at this point in the history
…cution phase, the front side needs to respond with an error.
  • Loading branch information
wenyh1 committed Oct 8, 2023
1 parent a6c5007 commit 2cfcac2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -67,6 +65,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl

protected final ReentrantLock lock = new ReentrantLock();
protected HashMap<RouteResultsetNode, Integer> nodeResponseStatus = Maps.newHashMap();
protected Set<MySQLResponseService> closedConnSet = new HashSet<>(1);
protected AtomicBoolean writeToClientFlag = new AtomicBoolean(false);
protected AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once
protected volatile String errMsg;
Expand Down Expand Up @@ -294,7 +293,7 @@ public void connectionClose(@NotNull AbstractService service, String closeReason

MySQLResponseService responseService = (MySQLResponseService) service;
final RouteResultsetNode node = (RouteResultsetNode) responseService.getAttachment();
if (checkIsAlreadyClosed(node)) return;
if (checkIsAlreadyClosed(node, responseService)) return;

LOGGER.warn("backend connect {}, conn info:{}", closeReason0, service);
DDLTraceHelper.log(session.getShardingService(), d -> d.infoByNode(node.getName(), stage, DDLTraceHelper.Status.fail, closeReason0));
Expand Down Expand Up @@ -333,11 +332,15 @@ public void executeFail(String errInfo) {
}
}

protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) {
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) {
lock.lock();
try {
if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true;
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
if (closedConnSet.contains(mysqlResponseService)) {
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
return true;
} else {
closedConnSet.add(mysqlResponseService);
}
session.getTargetMap().remove(node);
return false;
} finally {
Expand Down Expand Up @@ -375,6 +378,7 @@ protected void setErrPkg(String errMsg0, int errorCode0) {

protected void clearResources() {
nodeResponseStatus.clear();
closedConnSet.clear();
}

protected void handleEndPacket(MySQLPacket packet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,16 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, @NotNull AbstractSe
}

@Override
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) {
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) {
lock.lock();
try {
if (finishedTest) return true;
if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true;
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
session.getTargetMap().remove(node);
if (closedConnSet.contains(mysqlResponseService)) {
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
return true;
} else {
closedConnSet.add(mysqlResponseService);
}
return false;
} finally {
lock.unlock();
Expand Down

0 comments on commit 2cfcac2

Please sign in to comment.