Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[inner-2256] fix: narrow down the scope of syncing dbGroup state #3856

Merged
merged 2 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/main/java/com/actiontech/dble/cluster/ClusterHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.actiontech.dble.cluster.path.ChildPathMeta;
import com.actiontech.dble.cluster.path.PathMeta;
import com.actiontech.dble.cluster.values.*;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import org.apache.logging.log4j.util.Strings;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -114,12 +115,12 @@ public static Map<String, OnlineType> getOnlineMap() {
return ClusterGeneralConfig.getInstance().getClusterSender().getOnlineMap();
}

public static void writeConfToCluster() throws Exception {
public static void writeConfToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception {
ClusterLogic.forConfig().syncSequenceJsonToCluster();
ClusterLogic.forConfig().syncDbJsonToCluster();
ClusterLogic.forConfig().syncShardingJsonToCluster();
ClusterLogic.forConfig().syncUseJsonToCluster();
ClusterLogic.forHA().syncDbGroupStatusToCluster();
ClusterLogic.forHA().syncDbGroupStatusToCluster(reloadResult);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public void reloadConfigEvent(ConfStatus value, String params) throws Exception
return;
}
try {
boolean result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false);
if (!checkLocalResult(result)) {
ReloadConfig.ReloadResult result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false);
if (!checkLocalResult(result.isSuccess())) {
return;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBGroup;
import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBInstance;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import com.actiontech.dble.singleton.HaConfigManager;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
Expand Down Expand Up @@ -86,6 +87,30 @@ public void syncDbGroupStatusToCluster() throws Exception {
LOGGER.info("syncDbGroupStatusToCluster success");
}

public void syncDbGroupStatusToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception {
LOGGER.info("syncDbGroupStatusToCluster start");
HaConfigManager.getInstance().init(true);
Map<String, RawJson> dbGroupStatusMap = HaConfigManager.getInstance().getSourceJsonList();

Map<String, PhysicalDbGroup> recycleHostMap = reloadResult.getRecycleHostMap();
if (recycleHostMap != null) {
for (Map.Entry<String, PhysicalDbGroup> groupEntry : recycleHostMap.entrySet()) {
String dbGroupName = groupEntry.getKey();
LOGGER.debug("delete dbGroup_status:{}", dbGroupName);
clusterHelper.cleanKV(ClusterMetaUtil.getHaStatusPath(dbGroupName));
}
}
Map<String, PhysicalDbGroup> addOrChangeHostMap = reloadResult.getAddOrChangeHostMap();
if (addOrChangeHostMap != null) {
for (Map.Entry<String, PhysicalDbGroup> groupEntry : addOrChangeHostMap.entrySet()) {
RawJson dbGroupStatusJson = dbGroupStatusMap.get(groupEntry.getKey());
LOGGER.debug("add dbGroup_status:{}---{}", groupEntry.getKey(), dbGroupStatusJson);
clusterHelper.setKV(ClusterMetaUtil.getHaStatusPath(groupEntry.getKey()), dbGroupStatusJson);
}
}
LOGGER.info("syncDbGroupStatusToCluster success");
}

void syncHaStatusFromCluster(Gson gson, DbGroups dbs, List<DBGroup> dbGroupList) {
try {
List<ClusterEntry<RawJson>> statusKVList = this.getKVBeanOfChildPath(ClusterChildMetaUtil.getHaStatusPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,21 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, C
try {
ReloadLogHelper.briefInfo("added configLock");
//step 2 reload the local config file
boolean reloadResult;
ReloadResult reloadResult;
if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) ||
confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) {
reloadResult = reloadByConfig(loadAllMode, true);
} else {
reloadResult = reloadByLocalXml(loadAllMode);
}
if (!reloadResult) {
if (!reloadResult.isSuccess()) {
throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others, config should be reload");
}
ReloadLogHelper.briefInfo("single instance(self) finished");
ClusterDelayProvider.delayAfterMasterLoad();

//step 3 if the reload with no error ,than write the config file into cluster center remote
ClusterHelper.writeConfToCluster();
ClusterHelper.writeConfToCluster(reloadResult);
ReloadLogHelper.briefInfo("sent config file to cluster center");

//step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload
Expand Down Expand Up @@ -197,17 +197,17 @@ private static void reloadWithoutCluster(ManagerService service, final int loadA
if (!ReloadManager.startReload(TRIGGER_TYPE_COMMAND, confStatus)) {
throw new ReloadException(ErrorCode.ER_YES, "reload status error ,other client or cluster may in reload");
}
boolean reloadResult;
ReloadResult reloadResult;
if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) ||
confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) {
reloadResult = reloadByConfig(loadAllMode, true);
} else {
reloadResult = reloadByLocalXml(loadAllMode);
}
if (reloadResult && returnFlag) {
if (reloadResult.isSuccess() && returnFlag) {
// ok package
return;
} else if (!reloadResult) {
} else if (!reloadResult.isSuccess()) {
throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others,metadata should be reload");
}
} finally {
Expand All @@ -224,11 +224,11 @@ private static void writeErrorResult(ManagerService c, String errorMsg) {
c.writeErrMessage(ErrorCode.ER_YES, sb);
}

public static boolean reloadByLocalXml(final int loadAllMode) throws Exception {
public static ReloadResult reloadByLocalXml(final int loadAllMode) throws Exception {
return reload(loadAllMode, null, null, null, null);
}

public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception {
public static ReloadResult reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception {
RawJson userConfig = DbleTempConfig.getInstance().getUserConfig();
userConfig = userConfig == null ? DbleServer.getInstance().getConfig().getUserConfig() : userConfig;
RawJson dbConfig = DbleTempConfig.getInstance().getDbConfig();
Expand All @@ -237,7 +237,7 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc
shardingConfig = shardingConfig == null ? DbleServer.getInstance().getConfig().getShardingConfig() : shardingConfig;
RawJson sequenceConfig = DbleTempConfig.getInstance().getSequenceConfig();
sequenceConfig = sequenceConfig == null ? DbleServer.getInstance().getConfig().getSequenceConfig() : sequenceConfig;
final boolean reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig);
final ReloadResult reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig);

ReloadLogHelper.briefInfo("clean temp config ...");
DbleTempConfig.getInstance().clean();
Expand All @@ -248,7 +248,7 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc
return reloadResult;
}

private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
private static ReloadResult reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload");
try {
// load configuration
Expand Down Expand Up @@ -318,7 +318,7 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
return result;
return packReloadResult(result, changeItemList, forceAllReload, newDbGroups, oldConfig.getDbGroups());
} catch (Exception e) {
initFailed(newDbGroups);
throw e;
Expand All @@ -328,6 +328,35 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson
}
}

private static ReloadResult packReloadResult(boolean result, List<ChangeItem> changeItemList,
boolean forceAllReload,
Map<String, PhysicalDbGroup> newDbGroups,
Map<String, PhysicalDbGroup> oldDbGroups) {
if (forceAllReload) {
return new ReloadResult(result, newDbGroups, oldDbGroups);
} else {
Map<String, PhysicalDbGroup> addOrChangeMap0 = new HashMap<>();
Map<String, PhysicalDbGroup> recycleMap0 = new HashMap<>();
for (ChangeItem changeItem : changeItemList) {
if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) {
PhysicalDbGroup dbGroup = ((PhysicalDbGroup) changeItem.getItem());
switch (changeItem.getType()) {
case ADD:
case UPDATE:
addOrChangeMap0.put(dbGroup.getGroupName(), dbGroup);
break;
case DELETE:
recycleMap0.put(dbGroup.getGroupName(), dbGroup);
break;
default:
break;
}
}
}
return new ReloadResult(result, addOrChangeMap0, recycleMap0);
}
}

/**
* check version/packetSize/lowerCase
* get system variables
Expand Down Expand Up @@ -656,4 +685,28 @@ private static void writePacket(boolean isSuccess, ManagerService service, Strin
service.writeErrMessage(errorCode, "Reload Failure, The reason is " + errorMsg);
}
}

public static class ReloadResult { // dbGroup
private final boolean success;
private final Map<String, PhysicalDbGroup> addOrChangeHostMap;
private final Map<String, PhysicalDbGroup> recycleHostMap;

public ReloadResult(boolean success, Map<String, PhysicalDbGroup> addOrChangeHostMap, Map<String, PhysicalDbGroup> recycleHostMap) {
this.success = success;
this.addOrChangeHostMap = addOrChangeHostMap;
this.recycleHostMap = recycleHostMap;
}

public boolean isSuccess() {
return success;
}

public Map<String, PhysicalDbGroup> getAddOrChangeHostMap() {
return addOrChangeHostMap;
}

public Map<String, PhysicalDbGroup> getRecycleHostMap() {
return recycleHostMap;
}
}
}
Loading