From 7e7bb550acef9f5c310fa2d84fb1312ed12d95ff Mon Sep 17 00:00:00 2001 From: dcy Date: Fri, 26 Apr 2024 10:11:24 +0800 Subject: [PATCH] inner-2383:support delete&&support cluster --- .../actiontech/dble/cluster/ClusterLogic.java | 4 +-- .../response/ConfigStatusResponse.java | 9 ++++- .../dble/cluster/values/ConfStatus.java | 18 +++++++++- .../zktoxml/listen/ConfigStatusListener.java | 9 ++++- .../dble/config/ConfigInitializer.java | 35 ++++++++++++------- .../manager/handler/DeleteHandler.java | 14 +++++++- .../manager/handler/InsertHandler.java | 14 +++++++- .../manager/handler/UpdateHandler.java | 1 + .../manager/response/ReloadConfig.java | 6 ++++ .../manager/response/ReloadContext.java | 13 +++++++ 10 files changed, 104 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/actiontech/dble/cluster/ClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/ClusterLogic.java index e9f48f2c55..c9beced6ed 100644 --- a/src/main/java/com/actiontech/dble/cluster/ClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/ClusterLogic.java @@ -270,7 +270,7 @@ public static void dbGroupResponseEvent(String value, String dbGroupName) throws } } - public static void reloadConfigEvent(String value, String params) throws Exception { + public static void reloadConfigEvent(String value, String params, ReloadContext reloadContext) throws Exception { try { ClusterDelayProvider.delayBeforeSlaveReload(); LOGGER.info("reload_all " + ClusterPathUtil.getConfStatusOperatorPath() + " " + value); @@ -284,7 +284,7 @@ public static void reloadConfigEvent(String value, String params) throws Excepti return; } try { - boolean result = ReloadConfig.reloadAll(Integer.parseInt(params), new ReloadContext()); + boolean result = ReloadConfig.reloadAll(Integer.parseInt(params), reloadContext); if (!checkLocalResult(result)) { return; } diff --git a/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java b/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java index fc47de8e46..9a1731059c 100644 --- a/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java +++ b/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java @@ -13,6 +13,8 @@ import com.actiontech.dble.cluster.general.listener.ClusterClearKeyListener; import com.actiontech.dble.cluster.values.ConfStatus; import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.services.manager.response.ReloadContext; +import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,12 +56,17 @@ public void notifyProcess(KvBean configValue) throws Exception { //step 1 check if the change is from itself ConfStatus status = new ConfStatus(value); + ReloadContext reloadContext = new ReloadContext(); + if (status.getExtraInfo() != null) { + Gson gson = new Gson(); + reloadContext = gson.fromJson(status.getExtraInfo(), ReloadContext.class); + } if (status.getFrom().equals(SystemConfig.getInstance().getInstanceName())) { //self node return; } //step 2reload the config and set the self config status - ClusterLogic.reloadConfigEvent(value, status.getParams()); + ClusterLogic.reloadConfigEvent(value, status.getParams(), reloadContext); } diff --git a/src/main/java/com/actiontech/dble/cluster/values/ConfStatus.java b/src/main/java/com/actiontech/dble/cluster/values/ConfStatus.java index d1b54b71a5..b6d921d4a7 100644 --- a/src/main/java/com/actiontech/dble/cluster/values/ConfStatus.java +++ b/src/main/java/com/actiontech/dble/cluster/values/ConfStatus.java @@ -5,6 +5,8 @@ package com.actiontech.dble.cluster.values; +import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; + /** * Created by huqing.yan on 2017/7/10. */ @@ -35,8 +37,14 @@ public ConfStatus(String info) { this.status = Status.valueOf(infoDetail[1]); if (infoDetail.length == 3) this.params = infoDetail[2]; - else + else if (infoDetail.length == 4) { + this.params = infoDetail[2]; + this.extraInfo = new String(Base64.decode(infoDetail[3])); + } else { this.params = null; + this.extraInfo = null; + } + } public ConfStatus(Status status, String extraInfo) { @@ -56,6 +64,10 @@ public String toString() { if (params != null) { ss.append(split); ss.append(params); + if (extraInfo != null) { + ss.append(split); + ss.append(Base64.encode(extraInfo.getBytes())); + } } return ss.toString(); @@ -86,4 +98,8 @@ public String getParams() { public String getExtraInfo() { return extraInfo; } + + public void setExtraInfo(String extraInfo) { + this.extraInfo = extraInfo; + } } diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/zktoxml/listen/ConfigStatusListener.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/zktoxml/listen/ConfigStatusListener.java index dccc477e49..3678e09c64 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/zktoxml/listen/ConfigStatusListener.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/zktoxml/listen/ConfigStatusListener.java @@ -11,6 +11,8 @@ import com.actiontech.dble.cluster.values.ConfStatus; import com.actiontech.dble.cluster.zkprocess.comm.NotifyService; import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.services.manager.response.ReloadContext; +import com.google.gson.Gson; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -61,6 +63,11 @@ private void executeStatusChange(ChildData childData) throws Exception { String value = new String(childData.getData(), StandardCharsets.UTF_8); ConfStatus status = new ConfStatus(value); + ReloadContext reloadContext = new ReloadContext(); + if (status.getExtraInfo() != null) { + Gson gson = new Gson(); + reloadContext = gson.fromJson(status.getExtraInfo(), ReloadContext.class); + } if (status.getFrom().equals(SystemConfig.getInstance().getInstanceName())) { return; //self node } @@ -72,6 +79,6 @@ private void executeStatusChange(ChildData childData) throws Exception { LOGGER.warn("ConfigStatusListener notify error :" + service + " ,Exception info:", e); } } - ClusterLogic.reloadConfigEvent(value, status.getParams()); + ClusterLogic.reloadConfigEvent(value, status.getParams(), reloadContext); } } diff --git a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java index 98c382ac54..e8be0cbd36 100644 --- a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java +++ b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java @@ -8,6 +8,7 @@ import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; +import com.actiontech.dble.cluster.values.ConfStatus; import com.actiontech.dble.config.helper.TestSchemasTask; import com.actiontech.dble.config.helper.TestTask; import com.actiontech.dble.config.loader.xml.XMLDbLoader; @@ -203,11 +204,17 @@ public void testConnection() { boolean skipTestConnectionOnUpdate = false; if (SystemConfig.getInstance().isSkipTestConOnUpdate()) { if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) { - boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless()); - - //not support for sharding db group - if (!useSharding) { + if (reloadContext.getConfStatus() == ConfStatus.Status.MANAGER_DELETE) { skipTestConnectionOnUpdate = true; + LOGGER.info("will skip all test connection."); + } else { + boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless()); + + //not support for sharding db group + if (!useSharding) { + skipTestConnectionOnUpdate = true; + LOGGER.info("will skip test connection exclude self."); + } } } } @@ -223,14 +230,6 @@ public void testConnection() { } for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) { - if (skipTestConnectionOnUpdate) { - String finalDbGroupName = dbGroupName; - boolean find = reloadContext.getAffectDbInstanceList().stream().anyMatch((ele) -> ele.getGroupName().equals(finalDbGroupName) && ele.getInstanceName().equals(ds.getName())); - if (!find) { - //skip test connection on this dbInstance - continue; - } - } if (ds.getConfig().isDisabled()) { errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is disabled")); LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it"); @@ -242,6 +241,18 @@ public void testConnection() { ds.setTestConnSuccess(false); continue; } + if (skipTestConnectionOnUpdate) { + String finalDbGroupName = dbGroupName; + boolean find = reloadContext.getAffectDbInstanceList().stream().anyMatch((ele) -> ele.getGroupName().equals(finalDbGroupName) && ele.getInstanceName().equals(ds.getName())); + if (!find) { + //skip test connection on this dbInstance + ds.setTestConnSuccess(true); + LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] instance " + ds.getName() + " is skiped,because of option skipTestConOnUpdate"); + continue; + } else { + + } + } if (!testDbInstance(dbGroupName, ds, schemaList)) { isAllDbInstanceConnected = false; errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]"); diff --git a/src/main/java/com/actiontech/dble/services/manager/handler/DeleteHandler.java b/src/main/java/com/actiontech/dble/services/manager/handler/DeleteHandler.java index ee8c406a79..8dad968cfb 100644 --- a/src/main/java/com/actiontech/dble/services/manager/handler/DeleteHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/handler/DeleteHandler.java @@ -18,7 +18,10 @@ import com.actiontech.dble.services.manager.information.ManagerSchemaInfo; import com.actiontech.dble.services.manager.information.ManagerTableUtil; import com.actiontech.dble.services.manager.information.ManagerWritableTable; +import com.actiontech.dble.services.manager.information.tables.DbleDbInstance; import com.actiontech.dble.services.manager.response.ReloadConfig; +import com.actiontech.dble.services.manager.response.ReloadContext; +import com.actiontech.dble.services.manager.response.UniqueDbInstance; import com.alibaba.druid.sql.ast.statement.SQLExprTableSource; import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource; import com.alibaba.druid.sql.ast.statement.SQLTableSource; @@ -101,7 +104,16 @@ public void handle(String stmt, ManagerService service) { Set> affectPks = ManagerTableUtil.getAffectPks(service, managerTable, foundRows, null); rowSize = managerTable.deleteRows(affectPks); if (rowSize != 0) { - ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_DELETE, managerTable.getTableName())); + ReloadContext reloadContext = new ReloadContext(); + reloadContext.setConfStatus(ConfStatus.Status.MANAGER_DELETE); + if (managerTable instanceof DbleDbInstance) { + for (LinkedHashMap affectPk : affectPks) { + String instanceName = affectPk.get("name"); + String dbGroup = affectPk.get("db_group"); + reloadContext.addAffectDbInstance(new UniqueDbInstance(dbGroup, instanceName)); + } + } + ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_DELETE, managerTable.getTableName()), reloadContext); } } catch (SQLException e) { isSuccess = false; diff --git a/src/main/java/com/actiontech/dble/services/manager/handler/InsertHandler.java b/src/main/java/com/actiontech/dble/services/manager/handler/InsertHandler.java index c76b60d624..41db60305f 100644 --- a/src/main/java/com/actiontech/dble/services/manager/handler/InsertHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/handler/InsertHandler.java @@ -16,7 +16,10 @@ import com.actiontech.dble.services.manager.information.ManagerBaseTable; import com.actiontech.dble.services.manager.information.ManagerSchemaInfo; import com.actiontech.dble.services.manager.information.ManagerWritableTable; +import com.actiontech.dble.services.manager.information.tables.DbleDbInstance; import com.actiontech.dble.services.manager.response.ReloadConfig; +import com.actiontech.dble.services.manager.response.ReloadContext; +import com.actiontech.dble.services.manager.response.UniqueDbInstance; import com.actiontech.dble.util.StringUtil; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLNullExpr; @@ -67,7 +70,16 @@ public void handle(String stmt, ManagerService service) { managerTable.checkPrimaryKeyDuplicate(rows); rowSize = managerTable.insertRows(rows); if (rowSize != 0) { - ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_INSERT, managerTable.getTableName())); + ReloadContext reloadContext = new ReloadContext(); + reloadContext.setConfStatus(ConfStatus.Status.MANAGER_INSERT); + if (managerTable instanceof DbleDbInstance) { + for (LinkedHashMap affectPk : rows) { + String instanceName = affectPk.get("name"); + String dbGroup = affectPk.get("db_group"); + reloadContext.addAffectDbInstance(new UniqueDbInstance(dbGroup, instanceName)); + } + } + ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_INSERT, managerTable.getTableName()), reloadContext); } managerTable.afterExecute(); } catch (SQLException e) { diff --git a/src/main/java/com/actiontech/dble/services/manager/handler/UpdateHandler.java b/src/main/java/com/actiontech/dble/services/manager/handler/UpdateHandler.java index 6fe539f620..8e432cea09 100644 --- a/src/main/java/com/actiontech/dble/services/manager/handler/UpdateHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/handler/UpdateHandler.java @@ -158,6 +158,7 @@ private int updateRows(ManagerService service, ManagerWritableTable managerTable if (rowSize != 0) { ReloadContext reloadContext = new ReloadContext(); + reloadContext.setConfStatus(ConfStatus.Status.MANAGER_UPDATE); if (managerTable instanceof DbleDbInstance) { for (LinkedHashMap affectPk : affectPks) { String instanceName = affectPk.get("name"); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index e82f03a30b..be5a8e6fce 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -39,6 +39,8 @@ import com.actiontech.dble.singleton.CronScheduler; import com.actiontech.dble.singleton.FrontendUserManager; import com.actiontech.dble.singleton.TraceManager; +import com.actiontech.dble.util.StringUtil; +import com.google.gson.Gson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,6 +123,10 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, b //step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload ConfStatus status = new ConfStatus(SystemConfig.getInstance().getInstanceName(), ConfStatus.Status.RELOAD_ALL, String.valueOf(loadAllMode)); + if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) { + Gson gson = new Gson(); + status.setExtraInfo(gson.toJson(reloadContext)); + } ClusterHelper.setKV(ClusterPathUtil.getConfStatusOperatorPath(), status.toString()); ReloadLogHelper.info("reload config: sent config status to cluster center", LOGGER); //step 5 start a loop to check if all the dble in cluster is reload finished diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadContext.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadContext.java index 3511c32f54..7fe562c4b8 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadContext.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadContext.java @@ -6,6 +6,8 @@ package com.actiontech.dble.services.manager.response; +import com.actiontech.dble.cluster.values.ConfStatus; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -13,6 +15,17 @@ public class ReloadContext { private final List affectDbInstanceList = new ArrayList<>(); + private ConfStatus.Status confStatus = null; + + + public ConfStatus.Status getConfStatus() { + return confStatus; + } + + public void setConfStatus(ConfStatus.Status confStatus) { + this.confStatus = confStatus; + } + public List getAffectDbInstanceList() { return Collections.unmodifiableList(affectDbInstanceList); }