Skip to content

Commit

Permalink
[inner-2256] fix: narrow down the scope of syncing dbGroup state
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyh1 committed Dec 24, 2023
1 parent c8f6435 commit 539b3c7
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 15 deletions.
5 changes: 3 additions & 2 deletions src/main/java/com/actiontech/dble/cluster/ClusterHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.actiontech.dble.cluster.path.ChildPathMeta;
import com.actiontech.dble.cluster.path.PathMeta;
import com.actiontech.dble.cluster.values.*;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import org.apache.logging.log4j.util.Strings;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -114,12 +115,12 @@ public static Map<String, OnlineType> getOnlineMap() {
return ClusterGeneralConfig.getInstance().getClusterSender().getOnlineMap();
}

public static void writeConfToCluster() throws Exception {
public static void writeConfToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception {
ClusterLogic.forConfig().syncSequenceJsonToCluster();
ClusterLogic.forConfig().syncDbJsonToCluster();
ClusterLogic.forConfig().syncShardingJsonToCluster();
ClusterLogic.forConfig().syncUseJsonToCluster();
ClusterLogic.forHA().syncDbGroupStatusToCluster();
ClusterLogic.forHA().syncDbGroupStatusToCluster(reloadResult);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public void reloadConfigEvent(ConfStatus value, String params) throws Exception
return;
}
try {
boolean result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false);
if (!checkLocalResult(result)) {
ReloadConfig.ReloadResult result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false);
if (!checkLocalResult(result.isSuccess())) {
return;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBGroup;
import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBInstance;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import com.actiontech.dble.singleton.HaConfigManager;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
Expand Down Expand Up @@ -86,6 +87,30 @@ public void syncDbGroupStatusToCluster() throws Exception {
LOGGER.info("syncDbGroupStatusToCluster success");
}

public void syncDbGroupStatusToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception {
LOGGER.info("syncDbGroupStatusToCluster start");
HaConfigManager.getInstance().init(true);
Map<String, RawJson> dbGroupStatusMap = HaConfigManager.getInstance().getSourceJsonList();

Map<String, PhysicalDbGroup> recycleHostMap = reloadResult.getRecycleHostMap();
if (recycleHostMap != null) {
for (Map.Entry<String, PhysicalDbGroup> groupEntry : recycleHostMap.entrySet()) {
String dbGroupName = groupEntry.getKey();
LOGGER.debug("delete dbGroup_status:{}", dbGroupName);
clusterHelper.cleanKV(ClusterMetaUtil.getHaStatusPath(dbGroupName));
}
}
Map<String, PhysicalDbGroup> addOrChangeHostMap = reloadResult.getAddOrChangeHostMap();
if (addOrChangeHostMap != null) {
for (Map.Entry<String, PhysicalDbGroup> groupEntry : addOrChangeHostMap.entrySet()) {
RawJson dbGroupStatusJson = dbGroupStatusMap.get(groupEntry.getKey());
LOGGER.debug("add dbGroup_status:{}---{}", groupEntry.getKey(), dbGroupStatusJson);
clusterHelper.setKV(ClusterMetaUtil.getHaStatusPath(groupEntry.getKey()), dbGroupStatusJson);
}
}
LOGGER.info("syncDbGroupStatusToCluster success");
}

void syncHaStatusFromCluster(Gson gson, DbGroups dbs, List<DBGroup> dbGroupList) {
try {
List<ClusterEntry<RawJson>> statusKVList = this.getKVBeanOfChildPath(ClusterChildMetaUtil.getHaStatusPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,21 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, C
try {
ReloadLogHelper.briefInfo("added configLock");
//step 2 reload the local config file
boolean reloadResult;
ReloadResult reloadResult;
if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) ||
confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) {
reloadResult = reloadByConfig(loadAllMode, true);
} else {
reloadResult = reloadByLocalXml(loadAllMode);
}
if (!reloadResult) {
if (!reloadResult.isSuccess()) {
throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others, config should be reload");
}
ReloadLogHelper.briefInfo("single instance(self) finished");
ClusterDelayProvider.delayAfterMasterLoad();

//step 3 if the reload with no error ,than write the config file into cluster center remote
ClusterHelper.writeConfToCluster();
ClusterHelper.writeConfToCluster(reloadResult);
ReloadLogHelper.briefInfo("sent config file to cluster center");

//step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload
Expand Down Expand Up @@ -197,17 +197,17 @@ private static void reloadWithoutCluster(ManagerService service, final int loadA
if (!ReloadManager.startReload(TRIGGER_TYPE_COMMAND, confStatus)) {
throw new ReloadException(ErrorCode.ER_YES, "reload status error ,other client or cluster may in reload");
}
boolean reloadResult;
ReloadResult reloadResult;
if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) ||
confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) {
reloadResult = reloadByConfig(loadAllMode, true);
} else {
reloadResult = reloadByLocalXml(loadAllMode);
}
if (reloadResult && returnFlag) {
if (reloadResult.isSuccess() && returnFlag) {
// ok package
return;
} else if (!reloadResult) {
} else if (!reloadResult.isSuccess()) {
throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others,metadata should be reload");
}
} finally {
Expand All @@ -224,11 +224,11 @@ private static void writeErrorResult(ManagerService c, String errorMsg) {
c.writeErrMessage(ErrorCode.ER_YES, sb);
}

public static boolean reloadByLocalXml(final int loadAllMode) throws Exception {
public static ReloadResult reloadByLocalXml(final int loadAllMode) throws Exception {
return reload(loadAllMode, null, null, null, null);
}

public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception {
public static ReloadResult reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception {
RawJson userConfig = DbleTempConfig.getInstance().getUserConfig();
userConfig = userConfig == null ? DbleServer.getInstance().getConfig().getUserConfig() : userConfig;
RawJson dbConfig = DbleTempConfig.getInstance().getDbConfig();
Expand All @@ -237,7 +237,7 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc
shardingConfig = shardingConfig == null ? DbleServer.getInstance().getConfig().getShardingConfig() : shardingConfig;
RawJson sequenceConfig = DbleTempConfig.getInstance().getSequenceConfig();
sequenceConfig = sequenceConfig == null ? DbleServer.getInstance().getConfig().getSequenceConfig() : sequenceConfig;
final boolean reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig);
final ReloadResult reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig);

ReloadLogHelper.briefInfo("clean temp config ...");
DbleTempConfig.getInstance().clean();
Expand All @@ -248,7 +248,7 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc
return reloadResult;
}

private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
private static ReloadResult reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload");
try {
// load configuration
Expand Down Expand Up @@ -318,7 +318,7 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
return result;
return packReloadResult(result, changeItemList, forceAllReload, newDbGroups, oldConfig.getDbGroups());
} catch (Exception e) {
initFailed(newDbGroups);
throw e;
Expand All @@ -328,6 +328,35 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson
}
}

private static ReloadResult packReloadResult(boolean result, List<ChangeItem> changeItemList,
boolean forceAllReload,
Map<String, PhysicalDbGroup> newDbGroups,
Map<String, PhysicalDbGroup> oldDbGroups) {
if (forceAllReload) {
return new ReloadResult(result, newDbGroups, oldDbGroups);
} else {
Map<String, PhysicalDbGroup> addOrChangeMap0 = new HashMap<>();
Map<String, PhysicalDbGroup> recycleMap0 = new HashMap<>();
for (ChangeItem changeItem : changeItemList) {
if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) {
PhysicalDbGroup dbGroup = ((PhysicalDbGroup) changeItem.getItem());
switch (changeItem.getType()) {
case ADD:
case UPDATE:
addOrChangeMap0.put(dbGroup.getGroupName(), dbGroup);
break;
case DELETE:
recycleMap0.put(dbGroup.getGroupName(), dbGroup);
break;
default:
break;
}
}
}
return new ReloadResult(result, addOrChangeMap0, recycleMap0);
}
}

/**
* check version/packetSize/lowerCase
* get system variables
Expand Down Expand Up @@ -656,4 +685,28 @@ private static void writePacket(boolean isSuccess, ManagerService service, Strin
service.writeErrMessage(errorCode, "Reload Failure, The reason is " + errorMsg);
}
}

public static class ReloadResult { // dbGroup
private final boolean success;
private final Map<String, PhysicalDbGroup> addOrChangeHostMap;
private final Map<String, PhysicalDbGroup> recycleHostMap;

public ReloadResult(boolean success, Map<String, PhysicalDbGroup> addOrChangeHostMap, Map<String, PhysicalDbGroup> recycleHostMap) {
this.success = success;
this.addOrChangeHostMap = addOrChangeHostMap;
this.recycleHostMap = recycleHostMap;
}

public boolean isSuccess() {
return success;
}

public Map<String, PhysicalDbGroup> getAddOrChangeHostMap() {
return addOrChangeHostMap;
}

public Map<String, PhysicalDbGroup> getRecycleHostMap() {
return recycleHostMap;
}
}
}

0 comments on commit 539b3c7

Please sign in to comment.