From 539b3c76d6dc42a4eb2ca40b260c86979fc966b5 Mon Sep 17 00:00:00 2001 From: wenyh1 <2365151147@qq.com> Date: Sun, 24 Dec 2023 21:30:50 +0800 Subject: [PATCH] [inner-2256] fix: narrow down the scope of syncing dbGroup state --- .../dble/cluster/ClusterHelper.java | 5 +- .../cluster/logic/ConfigClusterLogic.java | 4 +- .../dble/cluster/logic/HAClusterLogic.java | 25 +++++++ .../manager/response/ReloadConfig.java | 75 ++++++++++++++++--- 4 files changed, 94 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java b/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java index b15ea5a3b6..409dba5b4e 100644 --- a/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java +++ b/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java @@ -11,6 +11,7 @@ import com.actiontech.dble.cluster.path.ChildPathMeta; import com.actiontech.dble.cluster.path.PathMeta; import com.actiontech.dble.cluster.values.*; +import com.actiontech.dble.services.manager.response.ReloadConfig; import org.apache.logging.log4j.util.Strings; import javax.annotation.Nonnull; @@ -114,12 +115,12 @@ public static Map getOnlineMap() { return ClusterGeneralConfig.getInstance().getClusterSender().getOnlineMap(); } - public static void writeConfToCluster() throws Exception { + public static void writeConfToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception { ClusterLogic.forConfig().syncSequenceJsonToCluster(); ClusterLogic.forConfig().syncDbJsonToCluster(); ClusterLogic.forConfig().syncShardingJsonToCluster(); ClusterLogic.forConfig().syncUseJsonToCluster(); - ClusterLogic.forHA().syncDbGroupStatusToCluster(); + ClusterLogic.forHA().syncDbGroupStatusToCluster(reloadResult); } @Nullable diff --git a/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java index b7f30808a2..7a8918d104 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java @@ -87,8 +87,8 @@ public void reloadConfigEvent(ConfStatus value, String params) throws Exception return; } try { - boolean result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false); - if (!checkLocalResult(result)) { + ReloadConfig.ReloadResult result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false); + if (!checkLocalResult(result.isSuccess())) { return; } } catch (Exception e) { diff --git a/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java index e598c9b9b4..182bafeb11 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java @@ -16,6 +16,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBGroup; import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBInstance; import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.services.manager.response.ReloadConfig; import com.actiontech.dble.singleton.HaConfigManager; import com.google.gson.Gson; import com.google.gson.JsonElement; @@ -86,6 +87,30 @@ public void syncDbGroupStatusToCluster() throws Exception { LOGGER.info("syncDbGroupStatusToCluster success"); } + public void syncDbGroupStatusToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception { + LOGGER.info("syncDbGroupStatusToCluster start"); + HaConfigManager.getInstance().init(true); + Map dbGroupStatusMap = HaConfigManager.getInstance().getSourceJsonList(); + + Map recycleHostMap = reloadResult.getRecycleHostMap(); + if (recycleHostMap != null) { + for (Map.Entry groupEntry : recycleHostMap.entrySet()) { + String dbGroupName = groupEntry.getKey(); + LOGGER.debug("delete dbGroup_status:{}", dbGroupName); + clusterHelper.cleanKV(ClusterMetaUtil.getHaStatusPath(dbGroupName)); + } + } + Map addOrChangeHostMap = reloadResult.getAddOrChangeHostMap(); + if (addOrChangeHostMap != null) { + for (Map.Entry groupEntry : addOrChangeHostMap.entrySet()) { + RawJson dbGroupStatusJson = dbGroupStatusMap.get(groupEntry.getKey()); + LOGGER.debug("add dbGroup_status:{}---{}", groupEntry.getKey(), dbGroupStatusJson); + clusterHelper.setKV(ClusterMetaUtil.getHaStatusPath(groupEntry.getKey()), dbGroupStatusJson); + } + } + LOGGER.info("syncDbGroupStatusToCluster success"); + } + void syncHaStatusFromCluster(Gson gson, DbGroups dbs, List dbGroupList) { try { List> statusKVList = this.getKVBeanOfChildPath(ClusterChildMetaUtil.getHaStatusPath()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index 8075cb5e9a..f937fa99a5 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -139,21 +139,21 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, C try { ReloadLogHelper.briefInfo("added configLock"); //step 2 reload the local config file - boolean reloadResult; + ReloadResult reloadResult; if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) { reloadResult = reloadByConfig(loadAllMode, true); } else { reloadResult = reloadByLocalXml(loadAllMode); } - if (!reloadResult) { + if (!reloadResult.isSuccess()) { throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others, config should be reload"); } ReloadLogHelper.briefInfo("single instance(self) finished"); ClusterDelayProvider.delayAfterMasterLoad(); //step 3 if the reload with no error ,than write the config file into cluster center remote - ClusterHelper.writeConfToCluster(); + ClusterHelper.writeConfToCluster(reloadResult); ReloadLogHelper.briefInfo("sent config file to cluster center"); //step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload @@ -197,17 +197,17 @@ private static void reloadWithoutCluster(ManagerService service, final int loadA if (!ReloadManager.startReload(TRIGGER_TYPE_COMMAND, confStatus)) { throw new ReloadException(ErrorCode.ER_YES, "reload status error ,other client or cluster may in reload"); } - boolean reloadResult; + ReloadResult reloadResult; if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) { reloadResult = reloadByConfig(loadAllMode, true); } else { reloadResult = reloadByLocalXml(loadAllMode); } - if (reloadResult && returnFlag) { + if (reloadResult.isSuccess() && returnFlag) { // ok package return; - } else if (!reloadResult) { + } else if (!reloadResult.isSuccess()) { throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others,metadata should be reload"); } } finally { @@ -224,11 +224,11 @@ private static void writeErrorResult(ManagerService c, String errorMsg) { c.writeErrMessage(ErrorCode.ER_YES, sb); } - public static boolean reloadByLocalXml(final int loadAllMode) throws Exception { + public static ReloadResult reloadByLocalXml(final int loadAllMode) throws Exception { return reload(loadAllMode, null, null, null, null); } - public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception { + public static ReloadResult reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception { RawJson userConfig = DbleTempConfig.getInstance().getUserConfig(); userConfig = userConfig == null ? DbleServer.getInstance().getConfig().getUserConfig() : userConfig; RawJson dbConfig = DbleTempConfig.getInstance().getDbConfig(); @@ -237,7 +237,7 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc shardingConfig = shardingConfig == null ? DbleServer.getInstance().getConfig().getShardingConfig() : shardingConfig; RawJson sequenceConfig = DbleTempConfig.getInstance().getSequenceConfig(); sequenceConfig = sequenceConfig == null ? DbleServer.getInstance().getConfig().getSequenceConfig() : sequenceConfig; - final boolean reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig); + final ReloadResult reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig); ReloadLogHelper.briefInfo("clean temp config ..."); DbleTempConfig.getInstance().clean(); @@ -248,7 +248,7 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc return reloadResult; } - private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception { + private static ReloadResult reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception { TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload"); try { // load configuration @@ -318,7 +318,7 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson if (!loader.isFullyConfigured()) { recycleServerConnections(); } - return result; + return packReloadResult(result, changeItemList, forceAllReload, newDbGroups, oldConfig.getDbGroups()); } catch (Exception e) { initFailed(newDbGroups); throw e; @@ -328,6 +328,35 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson } } + private static ReloadResult packReloadResult(boolean result, List changeItemList, + boolean forceAllReload, + Map newDbGroups, + Map oldDbGroups) { + if (forceAllReload) { + return new ReloadResult(result, newDbGroups, oldDbGroups); + } else { + Map addOrChangeMap0 = new HashMap<>(); + Map recycleMap0 = new HashMap<>(); + for (ChangeItem changeItem : changeItemList) { + if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) { + PhysicalDbGroup dbGroup = ((PhysicalDbGroup) changeItem.getItem()); + switch (changeItem.getType()) { + case ADD: + case UPDATE: + addOrChangeMap0.put(dbGroup.getGroupName(), dbGroup); + break; + case DELETE: + recycleMap0.put(dbGroup.getGroupName(), dbGroup); + break; + default: + break; + } + } + } + return new ReloadResult(result, addOrChangeMap0, recycleMap0); + } + } + /** * check version/packetSize/lowerCase * get system variables @@ -656,4 +685,28 @@ private static void writePacket(boolean isSuccess, ManagerService service, Strin service.writeErrMessage(errorCode, "Reload Failure, The reason is " + errorMsg); } } + + public static class ReloadResult { // dbGroup + private final boolean success; + private final Map addOrChangeHostMap; + private final Map recycleHostMap; + + public ReloadResult(boolean success, Map addOrChangeHostMap, Map recycleHostMap) { + this.success = success; + this.addOrChangeHostMap = addOrChangeHostMap; + this.recycleHostMap = recycleHostMap; + } + + public boolean isSuccess() { + return success; + } + + public Map getAddOrChangeHostMap() { + return addOrChangeHostMap; + } + + public Map getRecycleHostMap() { + return recycleHostMap; + } + } }