From c561fc85efa4fdf5ab6f12007cd9a8810b3c423c Mon Sep 17 00:00:00 2001 From: wenyh1 <2365151147@qq.com> Date: Sun, 24 Dec 2023 21:30:50 +0800 Subject: [PATCH] [inner-2256] fix: narrow down the scope of syncing dbGroup state (cherry picked from commit 539b3c76d6dc42a4eb2ca40b260c86979fc966b5) --- .../dble/cluster/ClusterHelper.java | 5 +- .../cluster/logic/ConfigClusterLogic.java | 4 +- .../dble/cluster/logic/HAClusterLogic.java | 25 +++++++++ .../manager/response/ReloadConfig.java | 55 ++++++++++++++----- 4 files changed, 70 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java b/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java index 8eea47b574..b5cf457244 100644 --- a/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java +++ b/src/main/java/com/actiontech/dble/cluster/ClusterHelper.java @@ -6,6 +6,7 @@ import com.actiontech.dble.cluster.path.ChildPathMeta; import com.actiontech.dble.cluster.path.PathMeta; import com.actiontech.dble.cluster.values.*; +import com.actiontech.dble.services.manager.response.ReloadConfig; import org.apache.logging.log4j.util.Strings; import javax.annotation.Nonnull; @@ -109,12 +110,12 @@ public static Map getOnlineMap() { return ClusterGeneralConfig.getInstance().getClusterSender().getOnlineMap(); } - public static void writeConfToCluster() throws Exception { + public static void writeConfToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception { ClusterLogic.forConfig().syncSequenceJsonToCluster(); ClusterLogic.forConfig().syncDbJsonToCluster(); ClusterLogic.forConfig().syncShardingJsonToCluster(); ClusterLogic.forConfig().syncUseJsonToCluster(); - ClusterLogic.forHA().syncDbGroupStatusToCluster(); + ClusterLogic.forHA().syncDbGroupStatusToCluster(reloadResult); } @Nullable diff --git a/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java index dc4e7973ca..a64f72e258 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java @@ -85,8 +85,8 @@ public void reloadConfigEvent(ConfStatus value, String params) throws Exception return; } try { - boolean result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false); - if (!checkLocalResult(result)) { + ReloadConfig.ReloadResult result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false); + if (!checkLocalResult(result.isSuccess())) { return; } } catch (Exception e) { diff --git a/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java index 09f1af5a2d..cd39a214f9 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/HAClusterLogic.java @@ -16,6 +16,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBGroup; import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBInstance; import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.services.manager.response.ReloadConfig; import com.actiontech.dble.singleton.HaConfigManager; import com.google.gson.Gson; import com.google.gson.JsonElement; @@ -86,6 +87,30 @@ public void syncDbGroupStatusToCluster() throws Exception { LOGGER.info("syncDbGroupStatusToCluster success"); } + public void syncDbGroupStatusToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception { + LOGGER.info("syncDbGroupStatusToCluster start"); + HaConfigManager.getInstance().init(true); + Map dbGroupStatusMap = HaConfigManager.getInstance().getSourceJsonList(); + + Map recycleHostMap = reloadResult.getRecycleHostMap(); + if (recycleHostMap != null) { + for (Map.Entry groupEntry : recycleHostMap.entrySet()) { + String dbGroupName = groupEntry.getKey(); + LOGGER.debug("delete dbGroup_status:{}", dbGroupName); + clusterHelper.cleanKV(ClusterMetaUtil.getHaStatusPath(dbGroupName)); + } + } + Map addOrChangeHostMap = reloadResult.getAddOrChangeHostMap(); + if (addOrChangeHostMap != null) { + for (Map.Entry groupEntry : addOrChangeHostMap.entrySet()) { + RawJson dbGroupStatusJson = dbGroupStatusMap.get(groupEntry.getKey()); + LOGGER.debug("add dbGroup_status:{}---{}", groupEntry.getKey(), dbGroupStatusJson); + clusterHelper.setKV(ClusterMetaUtil.getHaStatusPath(groupEntry.getKey()), dbGroupStatusJson); + } + } + LOGGER.info("syncDbGroupStatusToCluster success"); + } + void syncHaStatusFromCluster(Gson gson, DbGroups dbs, List dbGroupList) { try { List> statusKVList = this.getKVBeanOfChildPath(ClusterChildMetaUtil.getHaStatusPath()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index b4cf7460b3..c5ea3588a5 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -135,14 +135,14 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, C lock.writeLock().lock(); try { //step 2 reload the local config file - boolean reloadResult; + ReloadResult reloadResult; if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) { reloadResult = reloadByConfig(loadAllMode, true); } else { reloadResult = reloadByLocalXml(loadAllMode); } - if (!reloadResult) { + if (!reloadResult.isSuccess()) { packetResult.setSuccess(false); packetResult.setErrorMsg("Reload Failure.The reason is reload interruputed by others,config should be reload"); packetResult.setErrorCode(ErrorCode.ER_RELOAD_INTERRUPUTED); @@ -152,7 +152,7 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, C ClusterDelayProvider.delayAfterMasterLoad(); //step 3 if the reload with no error ,than write the config file into cluster center remote - ClusterHelper.writeConfToCluster(); + ClusterHelper.writeConfToCluster(reloadResult); ReloadLogHelper.info("reload config: sent config file to cluster center", LOGGER); //step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload @@ -203,17 +203,17 @@ private static void reloadWithoutCluster(ManagerService service, final int loadA packetResult.setErrorCode(ErrorCode.ER_YES); return; } - boolean reloadResult; + ReloadResult reloadResult; if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) { reloadResult = reloadByConfig(loadAllMode, true); } else { reloadResult = reloadByLocalXml(loadAllMode); } - if (reloadResult && returnFlag) { + if (reloadResult.isSuccess() && returnFlag) { // ok package return; - } else if (!reloadResult) { + } else if (!reloadResult.isSuccess()) { packetResult.setSuccess(false); packetResult.setErrorMsg("Reload Failure.The reason is reload interruputed by others,metadata should be reload"); packetResult.setErrorCode(ErrorCode.ER_RELOAD_INTERRUPUTED); @@ -232,11 +232,11 @@ private static void writeErrorResult(ManagerService c, String errorMsg) { } @Deprecated - public static boolean reloadByLocalXml(final int loadAllMode) throws Exception { + public static ReloadResult reloadByLocalXml(final int loadAllMode) throws Exception { return reload(loadAllMode, null, null, null, null); } - public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception { + public static ReloadResult reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception { RawJson userConfig = DbleTempConfig.getInstance().getUserConfig(); userConfig = userConfig == null ? DbleServer.getInstance().getConfig().getUserConfig() : userConfig; RawJson dbConfig = DbleTempConfig.getInstance().getDbConfig(); @@ -245,14 +245,14 @@ public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLoc shardingConfig = shardingConfig == null ? DbleServer.getInstance().getConfig().getShardingConfig() : shardingConfig; RawJson sequenceConfig = DbleTempConfig.getInstance().getSequenceConfig(); sequenceConfig = sequenceConfig == null ? DbleServer.getInstance().getConfig().getSequenceConfig() : sequenceConfig; - boolean reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig); + ReloadResult reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig); DbleTempConfig.getInstance().clean(); //sync json to local DbleServer.getInstance().getConfig().syncJsonToLocal(isWriteToLocal); return reloadResult; } - private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception { + private static ReloadResult reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception { TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload"); try { /* @@ -299,7 +299,7 @@ private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson } } - private static boolean intelligentReloadAll(int loadAllMode, ConfigInitializer loader) throws Exception { + private static ReloadResult intelligentReloadAll(int loadAllMode, ConfigInitializer loader) throws Exception { TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-intelligent-reload"); try { /* 2.1.1 get diff of dbGroups */ @@ -372,7 +372,7 @@ private static boolean intelligentReloadAll(int loadAllMode, ConfigInitializer l if (!loader.isFullyConfigured()) { recycleServerConnections(); } - return result; + return new ReloadResult(result, addOrChangeHosts, recycleHosts); } catch (Exception e) { initFailed(newDbGroups); throw e; @@ -407,7 +407,7 @@ private static void initFailed(Map newDbGroups) { } } - private static boolean forceReloadAll(final int loadAllMode, ConfigInitializer loader) throws Exception { + private static ReloadResult forceReloadAll(final int loadAllMode, ConfigInitializer loader) throws Exception { TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-force-reload"); try { ServerConfig config = DbleServer.getInstance().getConfig(); @@ -457,7 +457,8 @@ private static boolean forceReloadAll(final int loadAllMode, ConfigInitializer l ReloadLogHelper.info("reload config: apply new config start", LOGGER); boolean result; try { - result = config.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, config.getDbGroups(), newErRelations, newFuncNodeERMap, + Map oldDbGroupMap = config.getDbGroups(); + result = config.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, oldDbGroupMap, newErRelations, newFuncNodeERMap, newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions, loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig()); CronScheduler.getInstance().init(config.getSchemas()); @@ -469,7 +470,7 @@ private static boolean forceReloadAll(final int loadAllMode, ConfigInitializer l if (!loader.isFullyConfigured()) { recycleServerConnections(); } - return result; + return new ReloadResult(result, newDbGroups, oldDbGroupMap); } catch (Exception e) { initFailed(newDbGroups); throw e; @@ -612,4 +613,28 @@ private static void writePacket(boolean isSuccess, ManagerService service, Strin service.writeErrMessage(errorCode, errorMsg); } } + + public static class ReloadResult { // dbGroup + private final boolean success; + private final Map addOrChangeHostMap; + private final Map recycleHostMap; + + public ReloadResult(boolean success, Map addOrChangeHostMap, Map recycleHostMap) { + this.success = success; + this.addOrChangeHostMap = addOrChangeHostMap; + this.recycleHostMap = recycleHostMap; + } + + public boolean isSuccess() { + return success; + } + + public Map getAddOrChangeHostMap() { + return addOrChangeHostMap; + } + + public Map getRecycleHostMap() { + return recycleHostMap; + } + } }