Skip to content

Commit

Permalink
Merge pull request #3826 from actiontech/inner-2368
Browse files Browse the repository at this point in the history
[inner-2368] fix: when some nodes have been closed during the ddl execution phase, the front side needs to respond with an error.
  • Loading branch information
wenyh1 authored Oct 11, 2023
2 parents 2dc2564 + 6b2ff49 commit d5f2b72
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -67,6 +65,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl

protected final ReentrantLock lock = new ReentrantLock();
protected HashMap<RouteResultsetNode, Integer> nodeResponseStatus = Maps.newHashMap();
protected Set<MySQLResponseService> closedConnSet = new HashSet<>(1);
protected AtomicBoolean writeToClientFlag = new AtomicBoolean(false);
protected AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once
protected volatile String errMsg;
Expand Down Expand Up @@ -294,7 +293,7 @@ public void connectionClose(@NotNull AbstractService service, String closeReason

MySQLResponseService responseService = (MySQLResponseService) service;
final RouteResultsetNode node = (RouteResultsetNode) responseService.getAttachment();
if (checkIsAlreadyClosed(node)) return;
if (checkIsAlreadyClosed(node, responseService)) return;

LOGGER.warn("backend connect {}, conn info:{}", closeReason0, service);
DDLTraceHelper.log(session.getShardingService(), d -> d.infoByNode(node.getName(), stage, DDLTraceHelper.Status.fail, closeReason0));
Expand Down Expand Up @@ -333,11 +332,15 @@ public void executeFail(String errInfo) {
}
}

protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) {
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) {
lock.lock();
try {
if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true;
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
if (closedConnSet.contains(mysqlResponseService)) {
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
return true;
} else {
closedConnSet.add(mysqlResponseService);
}
session.getTargetMap().remove(node);
return false;
} finally {
Expand Down Expand Up @@ -375,6 +378,7 @@ protected void setErrPkg(String errMsg0, int errorCode0) {

protected void clearResources() {
nodeResponseStatus.clear();
closedConnSet.clear();
}

protected void handleEndPacket(MySQLPacket packet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,16 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, @NotNull AbstractSe
}

@Override
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) {
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) {
lock.lock();
try {
if (finishedTest) return true;
if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true;
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
session.getTargetMap().remove(node);
if (closedConnSet.contains(mysqlResponseService)) {
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
return true;
} else {
closedConnSet.add(mysqlResponseService);
}
return false;
} finally {
lock.unlock();
Expand Down

0 comments on commit d5f2b72

Please sign in to comment.