From e45711c5474a97e113fb2ec1c258111589203fc3 Mon Sep 17 00:00:00 2001 From: wenyh1 <2365151147@qq.com> Date: Fri, 9 Jun 2023 17:03:26 +0800 Subject: [PATCH] [inner-2218] Configuration support for HTAP --- .../dble/backend/datasource/ApNode.java | 8 + .../backend/datasource/PhysicalDbGroup.java | 12 +- .../cluster/logic/ConfigClusterLogic.java | 7 + .../dble/cluster/path/ClusterPathUtil.java | 1 + .../cluster/zkprocess/entity/Shardings.java | 15 ++ .../dble/cluster/zkprocess/entity/Users.java | 3 +- .../entity/sharding/schema/Schema.java | 13 ++ .../entity/sharding/shardingnode/ApNode.java | 59 +++++++ .../zkprocess/entity/user/HybridTAUser.java | 25 +++ .../cluster/zkprocess/entity/user/User.java | 2 +- .../dble/config/ConfigInitializer.java | 44 ++++- .../actiontech/dble/config/ServerConfig.java | 55 +++++- .../config/converter/ShardingConverter.java | 164 ++++++++++++++++-- .../dble/config/converter/UserConverter.java | 28 +++ .../config/model/sharding/ApNodeConfig.java | 8 + .../config/model/sharding/SchemaConfig.java | 10 +- .../model/sharding/ShardingNodeConfig.java | 2 +- .../config/model/user/HybridTAUserConfig.java | 12 ++ .../config/privileges/ShardingPrivileges.java | 2 +- .../dble/config/util/ConfigUtil.java | 11 +- .../factorys/BusinessServiceFactory.java | 2 +- .../information/tables/DbleBlacklist.java | 2 +- .../manager/information/tables/DbleEntry.java | 17 ++ .../information/tables/DbleEntrySchema.java | 2 +- .../tables/DbleEntryTablePrivilege.java | 2 +- .../manager/response/ChangeItemType.java | 2 +- .../services/manager/response/DryRun.java | 2 +- .../manager/response/ReloadConfig.java | 30 +++- .../services/manager/response/ShowUser.java | 2 +- src/main/resources/sharding.xsd | 1 + src/main/resources/sharding_detail.xsd | 8 + src/main/resources/user.xsd | 1 + src/main/resources/user_detail.xsd | 37 ++++ .../dble/cluster/ClusterHelpTest.java | 17 +- 34 files changed, 552 insertions(+), 54 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/backend/datasource/ApNode.java create mode 100644 src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/shardingnode/ApNode.java create mode 100644 src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/HybridTAUser.java create mode 100644 src/main/java/com/actiontech/dble/config/model/sharding/ApNodeConfig.java create mode 100644 src/main/java/com/actiontech/dble/config/model/user/HybridTAUserConfig.java diff --git a/src/main/java/com/actiontech/dble/backend/datasource/ApNode.java b/src/main/java/com/actiontech/dble/backend/datasource/ApNode.java new file mode 100644 index 0000000000..c1f9a9403f --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/datasource/ApNode.java @@ -0,0 +1,8 @@ +package com.actiontech.dble.backend.datasource; + +public class ApNode extends ShardingNode { + + public ApNode(String dbGroupName, String hostName, String database, PhysicalDbGroup dbGroup) { + super(dbGroupName, hostName, database, dbGroup); + } +} diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java index 9042d9873e..ea0156954d 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java @@ -70,6 +70,7 @@ public class PhysicalDbGroup { private boolean shardingUseless = true; private boolean rwSplitUseless = true; private boolean analysisUseless = true; + private boolean hybridTAUseless = true; private Set rwSplitSessionSet = Sets.newConcurrentHashSet(); private volatile Integer state = Integer.valueOf(INITIAL); @@ -164,7 +165,7 @@ public int getRwSplitMode() { } public boolean isUseless() { - return shardingUseless && rwSplitUseless && analysisUseless; + return shardingUseless && rwSplitUseless && analysisUseless && hybridTAUseless; } public boolean isShardingUseless() { @@ -191,12 +192,19 @@ public void setAnalysisUseless(boolean analysisUseless) { this.analysisUseless = analysisUseless; } + public boolean isHybridTAUseless() { + return hybridTAUseless; + } + + public void setHybridTAUseless(boolean hybridTAUseless) { + this.hybridTAUseless = hybridTAUseless; + } + private boolean checkSlaveSynStatus(PhysicalDbInstance ds) { return (dbGroupConfig.getDelayThreshold() != -1 && dbGroupConfig.isShowSlaveSql()) || ds.getDbGroup().isDelayDetectionStart(); } - public PhysicalDbInstance getWriteDbInstance() { return writeDbInstance; } diff --git a/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java index 1c31f3409e..b7f30808a2 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/ConfigClusterLogic.java @@ -25,6 +25,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.sharding.function.Function; import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Schema; import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Table; +import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ApNode; import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ShardingNode; import com.actiontech.dble.cluster.zkprocess.entity.user.BlackList; import com.actiontech.dble.cluster.zkprocess.entity.user.User; @@ -423,6 +424,12 @@ public Shardings parseShardingJsonToBean(Gson gson, RawJson jsonContent) { shardingBean.setShardingNode(shardingNodeList); } + JsonElement apNodeJson = jsonObject.get(ClusterPathUtil.AP_NODE); + if (apNodeJson != null) { + List apNodeList = gson.fromJson(apNodeJson.toString(), new TypeToken>() { + }.getType()); + shardingBean.setApNode(apNodeList); + } JsonElement functionJson = jsonObject.get(ClusterPathUtil.FUNCTION); if (functionJson != null) { List functions = gson.fromJson(functionJson.toString(), new TypeToken>() { diff --git a/src/main/java/com/actiontech/dble/cluster/path/ClusterPathUtil.java b/src/main/java/com/actiontech/dble/cluster/path/ClusterPathUtil.java index 22e6bb7786..10448288b6 100644 --- a/src/main/java/com/actiontech/dble/cluster/path/ClusterPathUtil.java +++ b/src/main/java/com/actiontech/dble/cluster/path/ClusterPathUtil.java @@ -33,6 +33,7 @@ private ClusterPathUtil() { public static final String SCHEMA = "schema"; public static final String DB_GROUP = "dbGroup"; public static final String SHARDING_NODE = "shardingNode"; + public static final String AP_NODE = "apNode"; public static final String BLACKLIST = "blacklist"; public static final String VERSION = "version"; public static final String FUNCTION = "function"; diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Shardings.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Shardings.java index e8376862f7..8c469976b0 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Shardings.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Shardings.java @@ -7,6 +7,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.sharding.function.Function; import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Schema; +import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ApNode; import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ShardingNode; import com.actiontech.dble.config.Versions; @@ -27,6 +28,8 @@ public class Shardings { private List shardingNode; + private List apNode; + protected List function; public List getSchema() { @@ -51,6 +54,16 @@ public void setShardingNode(List shardingNode) { this.shardingNode = shardingNode; } + public List getApNode() { + if (this.apNode == null) { + apNode = new ArrayList<>(); + } + return apNode; + } + + public void setApNode(List apNode) { + this.apNode = apNode; + } public List getFunction() { if (this.function == null) { @@ -77,6 +90,8 @@ public String toString() { schema + ", shardingNode=" + shardingNode + + ", apNode=" + + apNode + ", function=" + function + "]"; diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Users.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Users.java index 901416a496..ec7b7b0841 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Users.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/Users.java @@ -20,7 +20,8 @@ public class Users { @XmlElementRef(name = "ShardingUser", type = ShardingUser.class), @XmlElementRef(name = "ManagerUser", type = ManagerUser.class), @XmlElementRef(name = "RwSplitUser", type = RwSplitUser.class), - @XmlElementRef(name = "AnalysisUser", type = AnalysisUser.class) + @XmlElementRef(name = "AnalysisUser", type = AnalysisUser.class), + @XmlElementRef(name = "HybridTAUser", type = HybridTAUser.class) }) protected List user; diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/schema/Schema.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/schema/Schema.java index fb35fabdfc..2884ccb127 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/schema/Schema.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/schema/Schema.java @@ -30,6 +30,9 @@ public class Schema implements Named { @XmlAttribute protected String shardingNode; + @XmlAttribute + protected String apNode; + @XmlAttribute protected String function; @@ -80,6 +83,14 @@ public void setFunction(String function) { this.function = function; } + public void setApNode(String apNode) { + this.apNode = apNode; + } + + public String getApNode() { + return apNode; + } + public boolean isLogicalCreateADrop() { return logicalCreateADrop; } @@ -92,6 +103,8 @@ public String toString() { sqlMaxLimit + ", shardingNode=" + shardingNode + + ", apNode=" + + apNode + ", function=" + function + ", table=" + diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/shardingnode/ApNode.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/shardingnode/ApNode.java new file mode 100644 index 0000000000..06ec34a27e --- /dev/null +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/sharding/shardingnode/ApNode.java @@ -0,0 +1,59 @@ +package com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode; + +import com.actiontech.dble.cluster.zkprocess.entity.Named; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlType; + +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "apNode") +public class ApNode implements Named { + + @XmlAttribute(required = true) + private String name; + + @XmlAttribute(required = true) + private String dbGroup; + + @XmlAttribute(required = true) + private String database; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDbGroup() { + return dbGroup; + } + + public void setDbGroup(String dbGroup) { + this.dbGroup = dbGroup; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + @Override + public String toString() { + String builder = "ApNode [name=" + + name + + ", dbGroup=" + + dbGroup + + ", database=" + + database + + "]"; + return builder; + } + +} diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/HybridTAUser.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/HybridTAUser.java new file mode 100644 index 0000000000..a6bbe3c6ff --- /dev/null +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/HybridTAUser.java @@ -0,0 +1,25 @@ +package com.actiontech.dble.cluster.zkprocess.entity.user; + +import javax.xml.bind.annotation.*; + +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "hybridTAUser") +@XmlRootElement +public class HybridTAUser extends ShardingUser { + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("HybridTAUser{").append(super.toString()); + sb.append(", schemas=").append(schemas); + sb.append(", tenant=").append(tenant); + sb.append(", readOnly=").append(readOnly); + sb.append(", blacklist=").append(blacklist); + + if (privileges != null) { + sb.append(", privileges=").append(privileges); + } + sb.append('}'); + return sb.toString(); + } +} diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/User.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/User.java index 854e259ed3..d27e1c10a6 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/User.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/user/User.java @@ -10,7 +10,7 @@ import javax.xml.bind.annotation.XmlTransient; @XmlTransient -@XmlSeeAlso({ShardingUser.class, ManagerUser.class, RwSplitUser.class, AnalysisUser.class}) +@XmlSeeAlso({ShardingUser.class, ManagerUser.class, RwSplitUser.class, AnalysisUser.class, HybridTAUser.class}) public class User { @XmlAttribute(required = true) diff --git a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java index 81f66cb0be..5e80aa9ac8 100644 --- a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java +++ b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java @@ -5,6 +5,7 @@ */ package com.actiontech.dble.config; +import com.actiontech.dble.backend.datasource.ApNode; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; @@ -21,10 +22,7 @@ import com.actiontech.dble.config.model.db.type.DataBaseType; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.table.ERTable; -import com.actiontech.dble.config.model.user.AnalysisUserConfig; -import com.actiontech.dble.config.model.user.RwSplitUserConfig; -import com.actiontech.dble.config.model.user.UserConfig; -import com.actiontech.dble.config.model.user.UserName; +import com.actiontech.dble.config.model.user.*; import com.actiontech.dble.config.util.ConfigException; import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.plan.common.ptr.BoolPtr; @@ -53,6 +51,7 @@ public class ConfigInitializer implements ProblemReporter { private volatile Map users; private volatile Map schemas = Maps.newHashMap(); private volatile Map shardingNodes = Maps.newHashMap(); + private volatile Map apNodes = Maps.newHashMap(); private volatile Map dbGroups; private volatile Map> erRelations = Maps.newHashMap(); private volatile Map> funcNodeERMap = Maps.newHashMap(); @@ -147,12 +146,14 @@ private void init(RawJson userJson, RawJson dbJson, @Nullable RawJson shardingJs this.funcNodeERMap = shardingConverter.getFuncNodeERMap(); this.shardingNodes = shardingConverter.getShardingNodeMap(); this.functions = shardingConverter.getFunctionMap(); + this.apNodes = shardingConverter.getApNodeMap(); } this.shardingConfig = shardingJson; this.sequenceConfig = sequenceJson; checkRwSplitDbGroup(); checkAnalysisDbGroup(); + checkHybridTADbGroup(); checkWriteDbInstance(); } @@ -240,6 +241,20 @@ private void checkRwSplitDbGroup() { } } + private void checkHybridTADbGroup() { + dbGroups.values().stream().filter(f -> !f.isHybridTAUseless()).forEach(g -> { + if (g.getDbGroupConfig().instanceDatabaseType() != DataBaseType.CLICKHOUSE) { + throw new ConfigException("The dbGroup[" + g.getGroupName() + "] database type must be " + DataBaseType.CLICKHOUSE); + } else { + g.getAllDbInstanceMap().values().stream().forEach(i -> { + if (i.getConfig().getDataBaseType() != DataBaseType.CLICKHOUSE) { + throw new ConfigException("The dbInstance[" + g.getGroupName() + "." + i.getName() + "] all dbInstance database type must be " + DataBaseType.CLICKHOUSE); + } + }); + } + }); + } + public void testConnection(List changeItemList) { TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection"); try { @@ -280,6 +295,14 @@ public void testConnection(List changeItemList) { ShardingNode shardingNode = (ShardingNode) item; dbGroup = shardingNode.getDbGroup(); + boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames); + if (!isDbInstanceConnected) { + isAllDbInstanceConnected = false; + } + } else if (itemType == ChangeItemType.AP_NODE) { + ApNode apNode = (ApNode) item; + dbGroup = apNode.getDbGroup(); + boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames); if (!isDbInstanceConnected) { isAllDbInstanceConnected = false; @@ -303,6 +326,14 @@ public void testConnection(List changeItemList) { ShardingNode shardingNode = (ShardingNode) item; dbGroup = shardingNode.getDbGroup(); + boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames); + if (!isDbInstanceConnected) { + isAllDbInstanceConnected = false; + } + } else if (itemType == ChangeItemType.AP_NODE) { + ApNode apNode = (ApNode) item; + dbGroup = apNode.getDbGroup(); + boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames); if (!isDbInstanceConnected) { isAllDbInstanceConnected = false; @@ -329,7 +360,6 @@ public void testConnection(List changeItemList) { } } - public void testConnection() { TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection"); try { @@ -518,6 +548,10 @@ public Map getShardingNodes() { return shardingNodes; } + public Map getApNodes() { + return apNodes; + } + public Map getDbGroups() { return this.dbGroups; } diff --git a/src/main/java/com/actiontech/dble/config/ServerConfig.java b/src/main/java/com/actiontech/dble/config/ServerConfig.java index 5c1fe6b8cc..5087326bfd 100644 --- a/src/main/java/com/actiontech/dble/config/ServerConfig.java +++ b/src/main/java/com/actiontech/dble/config/ServerConfig.java @@ -9,10 +9,7 @@ import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; -import com.actiontech.dble.backend.datasource.PhysicalDbGroup; -import com.actiontech.dble.backend.datasource.PhysicalDbGroupDiff; -import com.actiontech.dble.backend.datasource.PhysicalDbInstance; -import com.actiontech.dble.backend.datasource.ShardingNode; +import com.actiontech.dble.backend.datasource.*; import com.actiontech.dble.cluster.JsonFactory; import com.actiontech.dble.cluster.logic.ClusterLogic; import com.actiontech.dble.cluster.values.RawJson; @@ -24,6 +21,7 @@ import com.actiontech.dble.config.model.sharding.table.BaseTableConfig; import com.actiontech.dble.config.model.sharding.table.ERTable; import com.actiontech.dble.config.model.sharding.table.ShardingTableFakeConfig; +import com.actiontech.dble.config.model.user.HybridTAUserConfig; import com.actiontech.dble.config.model.user.ShardingUserConfig; import com.actiontech.dble.config.model.user.UserConfig; import com.actiontech.dble.config.model.user.UserName; @@ -62,6 +60,7 @@ public class ServerConfig { private volatile Map users; private volatile Map schemas; private volatile Map shardingNodes; + private volatile Map apNodes; private volatile Map dbGroups; private volatile Map> erRelations; private volatile Map> funcNodeERMap; @@ -86,6 +85,7 @@ public ServerConfig() { this.schemas = confInitNew.getSchemas(); this.shardingNodes = confInitNew.getShardingNodes(); + this.apNodes = confInitNew.getApNodes(); this.erRelations = confInitNew.getErRelations(); this.funcNodeERMap = confInitNew.getFuncNodeERMap(); this.functions = confInitNew.getFunctions(); @@ -109,6 +109,7 @@ public ServerConfig(ConfigInitializer confInit) { this.dbGroups = confInit.getDbGroups(); this.schemas = confInit.getSchemas(); this.shardingNodes = confInit.getShardingNodes(); + this.apNodes = confInit.getApNodes(); this.erRelations = confInit.getErRelations(); this.funcNodeERMap = confInit.getFuncNodeERMap(); this.functions = confInit.getFunctions(); @@ -128,6 +129,7 @@ public ServerConfig(RawJson userConfig, RawJson dbConfig, RawJson shardingConfig this.schemas = confInitNew.getSchemas(); this.shardingNodes = confInitNew.getShardingNodes(); + this.apNodes = confInitNew.getApNodes(); this.erRelations = confInitNew.getErRelations(); this.funcNodeERMap = confInitNew.getFuncNodeERMap(); this.functions = confInitNew.getFunctions(); @@ -200,6 +202,11 @@ public Map getShardingNodes() { return shardingNodes; } + public Map getApNodes() { + waitIfChanging(); + return apNodes; + } + public Map getDbGroups() { waitIfChanging(); return dbGroups; @@ -546,6 +553,11 @@ private void deleteItem(Object item, ChangeItemType itemType, Map authSchemas = shardingUser.getSchemas(); for (String schema : authSchemas) { - if (!schemas.containsKey(schema)) { - String errMsg = "SelfCheck### User[name:" + shardingUser.getName() + (shardingUser.getTenant() == null ? "" : ",tenant:" + shardingUser.getTenant()) + "]'s schema [" + schema + "] is not exist!"; - throw new ConfigException(errMsg); - } + checkSchemaByUser(schema, shardingUser); } if (shardingUser.getPrivilegesConfig() != null) { @@ -847,6 +870,20 @@ public void selfChecking0() throws ConfigException { } } + private void checkSchemaByUser(String schema, ShardingUserConfig shardingUser) { + if (!schemas.containsKey(schema)) { + String errMsg = "SelfCheck### User[name:" + shardingUser.getName() + (shardingUser.getTenant() == null ? "" : ",tenant:" + shardingUser.getTenant()) + "]'s schema [" + schema + "] is not exist!"; + throw new ConfigException(errMsg); + } + if (shardingUser instanceof HybridTAUserConfig && schemas.get(schema).getDefaultApNode() == null) { + String errMsg = "SelfCheck### User[name:" + shardingUser.getName() + (shardingUser.getTenant() == null ? "" : ",tenant:" + shardingUser.getTenant()) + "]'s schema [" + schema + "] must contain apNode!"; + throw new ConfigException(errMsg); + } else if (!(shardingUser instanceof HybridTAUserConfig) && schemas.get(schema).getDefaultApNode() != null) { // ShardingUserConfig + String errMsg = "SelfCheck### User[name:" + shardingUser.getName() + (shardingUser.getTenant() == null ? "" : ",tenant:" + shardingUser.getTenant()) + "]'s schema [" + schema + "] can not contain apNode!"; + throw new ConfigException(errMsg); + } + } + public void syncJsonToLocal(boolean isWriteToLocal) throws Exception { XmlProcessBase xmlProcess = new XmlProcessBase(); xmlProcess.addParseClass(Shardings.class); diff --git a/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java b/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java index 71f5d92a2b..02fa79ce74 100644 --- a/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java +++ b/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java @@ -16,6 +16,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.Shardings; import com.actiontech.dble.cluster.zkprocess.entity.sharding.function.Function; import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.*; +import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ApNode; import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ShardingNode; import com.actiontech.dble.cluster.zkprocess.parse.XmlProcessBase; import com.actiontech.dble.config.ConfigFileName; @@ -23,6 +24,7 @@ import com.actiontech.dble.config.ProblemReporter; import com.actiontech.dble.config.Versions; import com.actiontech.dble.config.model.db.type.DataBaseType; +import com.actiontech.dble.config.model.sharding.ApNodeConfig; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.ShardingNodeConfig; import com.actiontech.dble.config.model.sharding.table.*; @@ -54,6 +56,7 @@ public class ShardingConverter { private static final Logger LOGGER = LoggerFactory.getLogger(ShardingConverter.class); private Map shardingNodeMap = Maps.newLinkedHashMap(); + private Map apNodeMap = Maps.newLinkedHashMap(); private final Map functionMap = Maps.newLinkedHashMap(); private final Map schemaConfigMap = Maps.newLinkedHashMap(); private final Map> erRelations = Maps.newLinkedHashMap(); @@ -105,6 +108,7 @@ public RawJson shardingBeanToJson(Shardings shardings) { } jsonObj.add(ClusterPathUtil.SCHEMA, gson.toJsonTree(schemaArray)); jsonObj.add(ClusterPathUtil.SHARDING_NODE, gson.toJsonTree(shardings.getShardingNode())); + jsonObj.add(ClusterPathUtil.AP_NODE, gson.toJsonTree(shardings.getApNode())); List functionList = shardings.getFunction(); readMapFileAddFunction(functionList); jsonObj.add(ClusterPathUtil.FUNCTION, gson.toJsonTree(functionList)); @@ -115,9 +119,11 @@ public void shardingJsonToMap(RawJson shardingJson, Map Shardings shardings = shardingJsonToBean(shardingJson); List shardingNodeList = shardings.getShardingNode(); List functionList = shardings.getFunction(); + List apNodeList = shardings.getApNode(); removeFileContent(functionList); List schemaList = shardings.getSchema(); Map shardingNodeConfigMap = Maps.newLinkedHashMap(); + Map apNodeConfigMap = Maps.newLinkedHashMap(); List errorInfos = new ArrayList<>(); if (shardings.getVersion() != null && !Versions.CONFIG_VERSION.equals(shardings.getVersion())) { if (problemReporter != null) { @@ -132,9 +138,11 @@ public void shardingJsonToMap(RawJson shardingJson, Map } try { shardingNodeListToMap(shardingNodeList, dbGroupMap, shardingNodeConfigMap); + apNodeListToMap(apNodeList, dbGroupMap, apNodeConfigMap); functionListToMap(functionList, problemReporter); - schemaListToMap(schemaList, shardingNodeConfigMap, problemReporter); + schemaListToMap(schemaList, shardingNodeConfigMap, apNodeConfigMap, problemReporter); deleteUselessShardingNode(errorInfos, sequenceJson); + deleteUselessApNode(errorInfos); } catch (Exception e) { String message = e.getMessage() == null ? e.toString() : e.getMessage(); throw new ConfigException("sharding json to map occurred parse errors, The detailed errors are as follows. " + message, e); @@ -185,15 +193,13 @@ private static void readMapFileAddFunction(List functionList) { } } - private void schemaListToMap(List schemaList, Map shardingNodeConfigMap, ProblemReporter problemReporter) { + private void schemaListToMap(List schemaList, Map shardingNodeConfigMap, Map apNodeConfigMap, ProblemReporter problemReporter) { for (com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Schema schema : schemaList) { String schemaName = schema.getName(); String schemaShardingNode = schema.getShardingNode(); String schemaFunction = schema.getFunction(); - String schemaSqlMaxLimitStr = null == schema.getSqlMaxLimit() ? null : String.valueOf(schema.getSqlMaxLimit()); - List tableList = Optional.ofNullable(schema.getTable()).orElse(Lists.newArrayList()); + String schemaApNode = schema.getApNode(); - int schemaSqlMaxLimit = getSqlMaxLimit(schemaSqlMaxLimitStr, -1); List shardingNodeList = null; AbstractPartitionAlgorithm algorithm = null; //check and add shardingNode @@ -226,28 +232,31 @@ private void schemaListToMap(List schemaList, Map tableConfigMap = Maps.newLinkedHashMap(); if (this.schemaConfigMap.containsKey(schemaName)) { throw new ConfigException("schema " + schemaName + " duplicated!"); } - for (Object tableObj : tableList) { - if (tableObj instanceof ShardingTable) { - fillShardingTable((ShardingTable) tableObj, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap, problemReporter); - } else if (tableObj instanceof GlobalTable) { - fillGlobalTable((GlobalTable) tableObj, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap); - } else if (tableObj instanceof SingleTable) { - fillSingleTable((SingleTable) tableObj, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap); - } - } + // check&load tables + String schemaSqlMaxLimitStr = null == schema.getSqlMaxLimit() ? null : String.valueOf(schema.getSqlMaxLimit()); + int schemaSqlMaxLimit = getSqlMaxLimit(schemaSqlMaxLimitStr, -1); + tableListToSchema(schema, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap, problemReporter); // if sharding has no default shardingNode,it must contains at least one table if (shardingNodeList == null && tableConfigMap.size() == 0) { throw new ConfigException( "sharding " + schemaName + " didn't config tables,so you must set shardingNode property!"); } - SchemaConfig schemaConfig = new SchemaConfig(schemaName, shardingNodeList, algorithm, tableConfigMap, schemaSqlMaxLimit, schema.isLogicalCreateADrop()); + SchemaConfig schemaConfig = new SchemaConfig(schemaName, shardingNodeList, algorithm, apNode, tableConfigMap, schemaSqlMaxLimit, schema.isLogicalCreateADrop()); mergeFuncNodeERMap(schemaConfig); mergeFkERMap(schemaConfig); this.schemaConfigMap.put(schemaName, schemaConfig); @@ -255,6 +264,20 @@ private void schemaListToMap(List schemaList, Map tableConfigMap, Map shardingNodeConfigMap, ProblemReporter problemReporter) { + final List tableList = Optional.ofNullable(schema.getTable()).orElse(Lists.newArrayList()); + for (Object tableObj : tableList) { + if (tableObj instanceof ShardingTable) { + fillShardingTable((ShardingTable) tableObj, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap, problemReporter); + } else if (tableObj instanceof GlobalTable) { + fillGlobalTable((GlobalTable) tableObj, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap); + } else if (tableObj instanceof SingleTable) { + fillSingleTable((SingleTable) tableObj, schemaSqlMaxLimit, tableConfigMap, shardingNodeConfigMap); + } + } + } + public static void removeFileContent(List functionList) { if (functionList == null) { return; @@ -483,8 +506,9 @@ private void deleteUselessShardingNode(List errorInfos, RawJson seque } else { throw new ConfigException("The dbGroup[" + entry.getValue().getDbGroupName() + "] associated with ShardingNode[" + entry.getKey() + "] does not exist"); } - if (shardingNodeGroup.getDbGroupConfig().getWriteInstanceConfig().getDataBaseType() != DataBaseType.MYSQL) { - throw new ConfigException("The dbGroup[" + entry.getValue().getDbGroupName() + "] not support database type [" + entry.getKey() + "]"); + DataBaseType dataBaseType = shardingNodeGroup.getDbGroupConfig().getWriteInstanceConfig().getDataBaseType(); + if (dataBaseType != DataBaseType.MYSQL) { + throw new ConfigException("The dbGroup[" + entry.getValue().getDbGroupName() + "] not support database type [" + dataBaseType + "]"); } } else { errorInfos.add(new ErrorInfo("Xml", "WARNING", "shardingNode " + shardingNodeName + " is useless")); @@ -493,6 +517,39 @@ private void deleteUselessShardingNode(List errorInfos, RawJson seque } } + private void deleteUselessApNode(List errorInfos) { + Set allUseApNode = new HashSet<>(); + for (SchemaConfig sc : this.schemaConfigMap.values()) { + // check apNode / dbGroup + if (sc.getDefaultApNode() != null) { + allUseApNode.add(sc.getDefaultApNode()); + } + } + + //delete redundancy apNode + Iterator> iterator = this.apNodeMap.entrySet().iterator(); + PhysicalDbGroup apNodeGroup; + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String apNodeName = entry.getKey(); + if (allUseApNode.contains(apNodeName)) { + apNodeGroup = entry.getValue().getDbGroup(); + if (apNodeGroup != null) { + apNodeGroup.setHybridTAUseless(false); + } else { + throw new ConfigException("The dbGroup[" + entry.getValue().getDbGroupName() + "] associated with ApNode[" + entry.getKey() + "] does not exist"); + } + DataBaseType dataBaseType = apNodeGroup.getDbGroupConfig().getWriteInstanceConfig().getDataBaseType(); + if (dataBaseType != DataBaseType.CLICKHOUSE) { + throw new ConfigException("The dbGroup[" + entry.getValue().getDbGroupName() + "] not support database type [" + dataBaseType + "]"); + } + } else { + errorInfos.add(new ErrorInfo("Xml", "WARNING", "apNode " + apNodeName + " is useless")); + iterator.remove(); + } + } + } + private void mergeFkERMap(SchemaConfig schemaConfig) { Map> schemaFkERMap = schemaConfig.getFkErRelations(); if (schemaFkERMap == null) { @@ -703,8 +760,49 @@ private void shardingNodeListToMap(List shardingNodeList, Map checkSet, Map shardingNodeConfigMap) { + private void apNodeListToMap(List apNodeList, Map dbGroupMap, Map apNodeConfigMap) { + Set checkSet = new HashSet<>(); + for (ApNode apNode : apNodeList) { + String apNodeName = apNode.getName(); + String apNodeDatabase = apNode.getDatabase(); + String apNodeDbGroup = apNode.getDbGroup(); + PhysicalDbGroup dbGroup = dbGroupMap.get(apNodeDbGroup); + if (StringUtils.isBlank(apNodeName) || StringUtils.isBlank(apNodeDatabase) || StringUtils.isBlank(apNodeDbGroup)) { + throw new ConfigException("apNode " + apNodeName + " define error ,attribute can't be empty"); + } + if (Objects.nonNull(dbGroup) && dbGroup.getDbGroupConfig().instanceDatabaseType() != DataBaseType.CLICKHOUSE) { + throw new ConfigException("apNodeDbGroup [" + apNodeDbGroup + "] define error ,all dbInstance database type must be " + DataBaseType.CLICKHOUSE); + } + //dnNamePre(name),databaseStr(database),host(dbGroup) can use ',', '$', '-' to configure multi nodes + // but the database size *dbGroup size must equal the size of name + // every dbGroup has all database in its tag + //eg: + //means:localhost1 has database of db$0-75,localhost2 has database of db$0-75(name is dn$76-151) + String[] dnNames = SplitUtil.split(apNodeName, ',', '$', '-'); + String[] databases = SplitUtil.split(apNodeDatabase, ',', '$', '-'); + String[] hostStrings = SplitUtil.split(apNodeDbGroup, ',', '$', '-'); + + if (dnNames.length != databases.length * hostStrings.length) { + throw new ConfigException("apNode " + apNodeName + + " define error ,Number of apNode name must be = Number of database * Number of dbGroup"); + } + if (dnNames.length > 1) { + List mhdList = mergerHostDatabase(hostStrings, databases); + for (int k = 0; k < dnNames.length; k++) { + String[] hd = mhdList.get(k); + String dnName = dnNames[k]; + String databaseName = hd[1]; + String hostName = hd[0]; + createApNode(dnName, databaseName, hostName, checkSet, apNodeConfigMap); + } + } else { + createApNode(apNodeName, apNodeDatabase, apNodeDbGroup, checkSet, apNodeConfigMap); + } + } + this.apNodeMap = initApNodes(apNodeConfigMap, dbGroupMap); + } + private void createShardingNode(String dnName, String database, String host, Set checkSet, Map shardingNodeConfigMap) { ShardingNodeConfig conf = new ShardingNodeConfig(dnName, database, host); if (checkSet.contains(host + "#" + database)) { throw new ConfigException("shardingNode " + conf.getName() + " use the same dbGroup&database with other shardingNode"); @@ -717,6 +815,22 @@ private void createShardingNode(String dnName, String database, String host, Set shardingNodeConfigMap.put(conf.getName(), conf); } + private void createApNode(String dnName, String database, String host, Set checkSet, Map apNodeConfigMap) { + if (shardingNodeMap.containsKey(dnName)) { + throw new ConfigException("apNode " + dnName + " repeat, because shardingNode already exists has that name"); + } + ApNodeConfig conf = new ApNodeConfig(dnName, database, host); + if (checkSet.contains(host + "#" + database)) { + throw new ConfigException("apNode " + conf.getName() + " use the same dbGroup&database with other apNode"); + } else { + checkSet.add(host + "#" + database); + } + if (apNodeConfigMap.containsKey(conf.getName())) { + throw new ConfigException("apNode " + conf.getName() + " duplicated!"); + } + apNodeConfigMap.put(conf.getName(), conf); + } + private Map initShardingNodes(Map nodeConf, Map dbGroupMap) { Map nodes = new HashMap<>(nodeConf.size()); for (ShardingNodeConfig conf : nodeConf.values()) { @@ -727,11 +841,25 @@ private Map initSha return nodes; } + private Map initApNodes(Map nodeConf, Map dbGroupMap) { + Map nodes = new HashMap<>(nodeConf.size()); + for (ApNodeConfig conf : nodeConf.values()) { + PhysicalDbGroup pool = dbGroupMap.get(conf.getDbGroupName()); + com.actiontech.dble.backend.datasource.ApNode apNode = new com.actiontech.dble.backend.datasource.ApNode(conf.getDbGroupName(), conf.getName(), conf.getDatabase(), pool); + nodes.put(apNode.getName(), apNode); + } + return nodes; + } + public Map getShardingNodeMap() { return shardingNodeMap; } + public Map getApNodeMap() { + return apNodeMap; + } + public Map getFunctionMap() { return functionMap; } diff --git a/src/main/java/com/actiontech/dble/config/converter/UserConverter.java b/src/main/java/com/actiontech/dble/config/converter/UserConverter.java index 08c1328fe7..4215cac343 100644 --- a/src/main/java/com/actiontech/dble/config/converter/UserConverter.java +++ b/src/main/java/com/actiontech/dble/config/converter/UserConverter.java @@ -48,6 +48,7 @@ public class UserConverter { public static final String TYPE_SHARDING_USER = "shardingUser"; public static final String TYPE_RWSPLIT_USER = "rwSplitUser"; public static final String TYPE_ANALYSIS_USER = "analysisUser"; + public static final String TYPE_HYBRIDTA_USER = "hybridTAUser"; private final Gson gson = JsonFactory.getJson(); private final Map userConfigMap = Maps.newLinkedHashMap(); private final Map blackListConfigMap = Maps.newLinkedHashMap(); @@ -155,6 +156,8 @@ private void userListToMap(List userList, Map blac if (user instanceof ManagerUser) { fillManagerUser(userConfig, (ManagerUser) user); + } else if (user instanceof HybridTAUser) { + fillHybridTAUser(userConfig, (HybridTAUser) user, blackListMap, problemReporter); } else if (user instanceof ShardingUser) { fillShardingUser(userConfig, (ShardingUser) user, blackListMap, problemReporter); } else if (user instanceof RwSplitUser) { @@ -203,6 +206,31 @@ private void fillRwSplitUser(UserConfig userConfig, RwSplitUser rwSplitUser, Map this.userConfigMap.put(userName, rwSplitUserConfig); } + private void fillHybridTAUser(UserConfig userConfig, HybridTAUser hybridTAUser, Map blackListMap, ProblemReporter problemReporter) { + String tenant = hybridTAUser.getTenant(); + final boolean readOnly = Optional.ofNullable(hybridTAUser.getReadOnly()).orElse(false); + String schemas = hybridTAUser.getSchemas(); + String blacklistStr = hybridTAUser.getBlacklist(); + + UserName userName = new UserName(userConfig.getName(), tenant); + if (this.userConfigMap.containsKey(userName)) { + throw new ConfigException("User [" + userName.getFullName() + "] has already existed"); + } + if (StringUtil.isEmpty(schemas)) { + throw new ConfigException("User [" + userName.getFullName() + "]'s schemas is empty"); + } + String[] strArray = SplitUtil.split(schemas, ',', true); + + WallProvider wallProvider = getWallProvider(blackListMap, problemReporter, blacklistStr, userName); + // load DML Privileges + Privileges privileges = hybridTAUser.getPrivileges(); + UserPrivilegesConfig privilegesConfig = loadPrivilegesConfig(privileges, userConfig); + + HybridTAUserConfig hybridTAUserConfig = new HybridTAUserConfig(userConfig, userName.getTenant(), wallProvider, readOnly, new HashSet<>(Arrays.asList(strArray)), privilegesConfig); + hybridTAUserConfig.setId(this.userId.incrementAndGet()); + this.userConfigMap.put(userName, hybridTAUserConfig); + } + private void fillShardingUser(UserConfig userConfig, ShardingUser shardingUser, Map blackListMap, ProblemReporter problemReporter) { String tenant = shardingUser.getTenant(); final boolean readOnly = Optional.ofNullable(shardingUser.getReadOnly()).orElse(false); diff --git a/src/main/java/com/actiontech/dble/config/model/sharding/ApNodeConfig.java b/src/main/java/com/actiontech/dble/config/model/sharding/ApNodeConfig.java new file mode 100644 index 0000000000..0ec12cdb1d --- /dev/null +++ b/src/main/java/com/actiontech/dble/config/model/sharding/ApNodeConfig.java @@ -0,0 +1,8 @@ +package com.actiontech.dble.config.model.sharding; + +public class ApNodeConfig extends ShardingNodeConfig { + + public ApNodeConfig(String name, String database, String dbGroupName) { + super(name, database, dbGroupName); + } +} diff --git a/src/main/java/com/actiontech/dble/config/model/sharding/SchemaConfig.java b/src/main/java/com/actiontech/dble/config/model/sharding/SchemaConfig.java index ec6f0bc5bb..89169f6955 100644 --- a/src/main/java/com/actiontech/dble/config/model/sharding/SchemaConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/sharding/SchemaConfig.java @@ -24,6 +24,7 @@ public class SchemaConfig { private final boolean noSharding; private final List defaultShardingNodes; private final AbstractPartitionAlgorithm function; + private final String defaultApNode; private final String metaShardingNode; private final Set allShardingNodes; /** @@ -36,11 +37,12 @@ public class SchemaConfig { private Map> funcNodeERMap; private final boolean logicalCreateADrop; - public SchemaConfig(String name, List defaultShardingNodes, AbstractPartitionAlgorithm function, + public SchemaConfig(String name, List defaultShardingNodes, AbstractPartitionAlgorithm function, String defaultApNode, Map tables, int defaultMaxLimit, boolean logicalCreateADrop) { this.name = name; this.defaultShardingNodes = defaultShardingNodes; this.function = function; + this.defaultApNode = defaultApNode; this.tables = tables; this.defaultMaxLimit = defaultMaxLimit; this.logicalCreateADrop = logicalCreateADrop; @@ -64,6 +66,7 @@ public SchemaConfig(SchemaConfig oldSchemaConfig) { this.name = oldSchemaConfig.getName().toLowerCase(); this.defaultShardingNodes = oldSchemaConfig.getDefaultShardingNodes(); this.function = oldSchemaConfig.getFunction(); + this.defaultApNode = oldSchemaConfig.getDefaultApNode(); this.tables = oldSchemaConfig.getLowerCaseTables(); this.defaultMaxLimit = oldSchemaConfig.getDefaultMaxLimit(); this.logicalCreateADrop = oldSchemaConfig.isLogicalCreateADrop(); @@ -217,6 +220,10 @@ public AbstractPartitionAlgorithm getFunction() { return function; } + public String getDefaultApNode() { + return defaultApNode; + } + public Map> getFkErRelations() { return fkErRelations; } @@ -280,6 +287,7 @@ public boolean equalsBaseInfo(SchemaConfig schemaConfig) { public boolean equalsConfigInfo(SchemaConfig schemaConfig) { boolean isEquals = CollectionUtil.equalsWithEmpty(this.defaultShardingNodes, schemaConfig.getDefaultShardingNodes()); + isEquals = isEquals && StringUtil.equalsWithEmpty(this.defaultApNode, schemaConfig.getDefaultApNode()); if (isEquals && isDefaultShardingNode()) { return this.function.equals(schemaConfig.getFunction()) && this.defaultMaxLimit == schemaConfig.getDefaultMaxLimit(); diff --git a/src/main/java/com/actiontech/dble/config/model/sharding/ShardingNodeConfig.java b/src/main/java/com/actiontech/dble/config/model/sharding/ShardingNodeConfig.java index b4f55b028e..b32c7c7296 100644 --- a/src/main/java/com/actiontech/dble/config/model/sharding/ShardingNodeConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/sharding/ShardingNodeConfig.java @@ -4,7 +4,7 @@ */ package com.actiontech.dble.config.model.sharding; -public final class ShardingNodeConfig { +public class ShardingNodeConfig { private final String name; private final String database; diff --git a/src/main/java/com/actiontech/dble/config/model/user/HybridTAUserConfig.java b/src/main/java/com/actiontech/dble/config/model/user/HybridTAUserConfig.java new file mode 100644 index 0000000000..08d7a035bf --- /dev/null +++ b/src/main/java/com/actiontech/dble/config/model/user/HybridTAUserConfig.java @@ -0,0 +1,12 @@ +package com.actiontech.dble.config.model.user; + +import com.alibaba.druid.wall.WallProvider; + +import java.util.Set; + +public class HybridTAUserConfig extends ShardingUserConfig { + + public HybridTAUserConfig(UserConfig user, String tenant, WallProvider blacklist, boolean readOnly, Set schemas, UserPrivilegesConfig privilegesConfig) { + super(user, tenant, blacklist, readOnly, schemas, privilegesConfig); + } +} diff --git a/src/main/java/com/actiontech/dble/config/privileges/ShardingPrivileges.java b/src/main/java/com/actiontech/dble/config/privileges/ShardingPrivileges.java index 096857d49c..721d2ab72f 100644 --- a/src/main/java/com/actiontech/dble/config/privileges/ShardingPrivileges.java +++ b/src/main/java/com/actiontech/dble/config/privileges/ShardingPrivileges.java @@ -22,7 +22,7 @@ public enum CheckType { // check SQL Privilege public static boolean checkPrivilege(ServerUserConfig userConfig, String schema, String tableName, CheckType chekcType) { - if (!(userConfig instanceof ShardingUserConfig)) { + if (!(userConfig instanceof ShardingUserConfig)) { // contains HybridTAUserConfig return true; } UserPrivilegesConfig userPrivilege = ((ShardingUserConfig) userConfig).getPrivilegesConfig(); diff --git a/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java b/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java index d3dc5f6bf7..393b456f4a 100644 --- a/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java +++ b/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java @@ -6,6 +6,7 @@ package com.actiontech.dble.config.util; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.datasource.ApNode; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; @@ -258,8 +259,8 @@ public static boolean checkMysqlVersion(String version, PhysicalDbInstance insta if (!instance.getDbGroup().isRwSplitUseless()) { //rw-split return checkVersionWithRwSplit(version, instance, isThrowException, type); - } else if (!instance.getDbGroup().isShardingUseless() || !instance.getDbGroup().isAnalysisUseless()) { - //sharding or analysis + } else if (!instance.getDbGroup().isShardingUseless() || !instance.getDbGroup().isAnalysisUseless() || !instance.getDbGroup().isHybridTAUseless()) { + //sharding or analysis or hybridTA(apNode) boolean isMatch = majorVersion >= VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion()); if (!isMatch && isThrowException) { throw new ConfigException("this dbInstance[=" + instance.getConfig().getUrl() + "]'s version[=" + version + "] cannot be lower than the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "],pls check the backend " + type + " node."); @@ -448,7 +449,7 @@ public static void checkDbleAndMysqlVersion(List changeItemList, Con .filter(changeItem -> (changeItem.getType() == ChangeType.ADD) || (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE && changeItem.getType() == ChangeType.UPDATE && changeItem.isAffectTestConn()) || (changeItem.getItemType() == ChangeItemType.USERNAME && changeItem.getType() == ChangeType.UPDATE && changeItem.isAffectEntryDbGroup()) || - (changeItem.getItemType() == ChangeItemType.SHARDING_NODE && changeItem.getType() == ChangeType.UPDATE)) + ((changeItem.getItemType() == ChangeItemType.SHARDING_NODE || changeItem.getItemType() == ChangeItemType.AP_NODE) && changeItem.getType() == ChangeType.UPDATE)) .collect(Collectors.toList()); if (changeItemList.size() == 0 || needCheckVersionList.isEmpty()) { @@ -471,6 +472,10 @@ public static void checkDbleAndMysqlVersion(List changeItemList, Con ShardingNode shardingNode = (ShardingNode) item; PhysicalDbGroup dbGroup = shardingNode.getDbGroup(); checkDbGroupVersion(dbGroup); + } else if (changeItem.getItemType() == ChangeItemType.AP_NODE) { + ApNode apNode = (ApNode) item; + PhysicalDbGroup dbGroup = apNode.getDbGroup(); + checkDbGroupVersion(dbGroup); } else if (changeItem.getItemType() == ChangeItemType.USERNAME) { UserConfig userConfig = newConfigLoader.getUsers().get(item); if (userConfig instanceof RwSplitUserConfig) { diff --git a/src/main/java/com/actiontech/dble/services/factorys/BusinessServiceFactory.java b/src/main/java/com/actiontech/dble/services/factorys/BusinessServiceFactory.java index 8fb80d18d9..50ceb02e4d 100644 --- a/src/main/java/com/actiontech/dble/services/factorys/BusinessServiceFactory.java +++ b/src/main/java/com/actiontech/dble/services/factorys/BusinessServiceFactory.java @@ -28,7 +28,7 @@ private BusinessServiceFactory() { public static FrontendService getBusinessService(AuthResultInfo info, AbstractConnection connection) { UserConfig userConfig = info.getUserConfig(); - if (userConfig instanceof ShardingUserConfig) { + if (userConfig instanceof ShardingUserConfig) { // contains HybridTAUserConfig return new ShardingService(connection, info); } else if (userConfig instanceof ManagerUserConfig) { return new ManagerService(connection, info); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBlacklist.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBlacklist.java index 1f93df5617..d184ef115a 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBlacklist.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBlacklist.java @@ -60,7 +60,7 @@ protected List> getRows() { List blackListNames = new ArrayList<>(); for (UserConfig userConfig : userConfigList) { WallProvider blackList = null; - if (userConfig instanceof ShardingUserConfig) { + if (userConfig instanceof ShardingUserConfig) { // contains HybridTAUserConfig blackList = ((ShardingUserConfig) userConfig).getBlacklist(); } else if (userConfig instanceof RwSplitUserConfig) { blackList = ((RwSplitUserConfig) userConfig).getBlacklist(); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntry.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntry.java index 1d0caeb595..d0dd349681 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntry.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntry.java @@ -89,6 +89,8 @@ protected List> getRows() { map.put(COLUMN_ID, userConfig.getId() + ""); if (userConfig instanceof ManagerUserConfig) { getManagerUserConfig(map, (ManagerUserConfig) userConfig); + } else if (userConfig instanceof HybridTAUserConfig) { + getHybridTAUserConfig(map, (HybridTAUserConfig) userConfig); } else if (userConfig instanceof ShardingUserConfig) { getShardingUserConfig(map, (ShardingUserConfig) userConfig); } else if (userConfig instanceof RwSplitUserConfig) { @@ -157,6 +159,21 @@ private void getAnalysisUserConfig(LinkedHashMap map, AnalysisUs map.put(COLUMN_BLACKLIST, userConfig.getBlacklist() == null ? null : userConfig.getBlacklist().getName()); } + private void getHybridTAUserConfig(LinkedHashMap map, HybridTAUserConfig userConfig) { + map.put(COLUMN_TYPE, userConfig.getTenant() != null ? "conn_attr" : "username"); + map.put(COLUMN_USER_TYPE, UserConverter.TYPE_HYBRIDTA_USER); + map.put(COLUMN_USERNAME, userConfig.getName()); + map.put(COLUMN_PASSWORD_ENCRYPT, getPasswordEncrypt(userConfig)); + map.put(COLUMN_ENCRYPT_CONFIGURED, userConfig.isEncrypt() + ""); + map.put(COLUMN_CONN_ATTR_KEY, userConfig.getTenant() != null ? "tenant" : null); + map.put(COLUMN_CONN_ATTR_VALUE, userConfig.getTenant()); + map.put(COLUMN_WHITE_IPS, getWhiteIps(userConfig.getWhiteIPs())); + map.put(COLUMN_READONLY, "-"); + map.put(COLUMN_MAX_CONN_COUNT, userConfig.getMaxCon() == 0 ? "no limit" : userConfig.getMaxCon() + ""); + map.put(COLUMN_BLACKLIST, userConfig.getBlacklist() == null ? null : userConfig.getBlacklist().getName()); + } + + public static String getPasswordEncrypt(UserConfig userConfig) { try { return DecryptUtil.encrypt("0:" + userConfig.getName() + ":" + userConfig.getPassword()); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntrySchema.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntrySchema.java index 198cbb4f46..b851539ee7 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntrySchema.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntrySchema.java @@ -43,7 +43,7 @@ protected List> getRows() { sorted((a, b) -> Integer.compare(a.getValue().getId(), b.getValue().getId())). forEach(v -> { UserConfig userConfig = v.getValue(); - if (userConfig instanceof ShardingUserConfig) { + if (userConfig instanceof ShardingUserConfig) { // contains HybridTAUserConfig for (String schema : ((ShardingUserConfig) userConfig).getSchemas()) { LinkedHashMap map = Maps.newLinkedHashMap(); map.put(COLUMN_ID, userConfig.getId() + ""); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntryTablePrivilege.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntryTablePrivilege.java index 68437be75e..b7e6d78761 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntryTablePrivilege.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleEntryTablePrivilege.java @@ -76,7 +76,7 @@ protected List> getRows() { sorted((a, b) -> Integer.compare(a.getValue().getId(), b.getValue().getId())). forEach(v -> { UserConfig userConfig = v.getValue(); - if (userConfig instanceof ShardingUserConfig) { + if (userConfig instanceof ShardingUserConfig) { // contains HybridTAUserConfig ShardingUserConfig shardingUserConfig = ((ShardingUserConfig) userConfig); UserPrivilegesConfig userPrivilegesConfig = shardingUserConfig.getPrivilegesConfig(); if (userPrivilegesConfig != null) { diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ChangeItemType.java b/src/main/java/com/actiontech/dble/services/manager/response/ChangeItemType.java index b8dfa59da7..698c22b1b1 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ChangeItemType.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ChangeItemType.java @@ -6,5 +6,5 @@ package com.actiontech.dble.services.manager.response; public enum ChangeItemType { - PHYSICAL_DB_GROUP(), PHYSICAL_DB_INSTANCE(), SHARDING_NODE(), USERNAME(); + PHYSICAL_DB_GROUP(), PHYSICAL_DB_INSTANCE(), SHARDING_NODE(), AP_NODE(), USERNAME(); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java index 6dc9a30c50..d384d3837c 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java @@ -357,7 +357,7 @@ private static void userCheck(List list, ServerConfig serverConfig) { for (UserConfig user : userMap.values()) { if (user instanceof ManagerUserConfig) { hasManagerUser = true; - } else if (user instanceof ShardingUserConfig) { + } else if (user instanceof ShardingUserConfig) { // contains HybridTAUserConfig hasShardingUser = true; schema.addAll(((ShardingUserConfig) user).getSchemas()); } else { diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index b93153d405..6c57782b9e 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -6,6 +6,7 @@ package com.actiontech.dble.services.manager.response; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.datasource.ApNode; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; @@ -361,7 +362,7 @@ private static void testConnection(ConfigInitializer loader, List ch if (forceAllReload && loader.isFullyConfigured()) { loader.testConnection(); } else { - syncShardingNode(loader); + syncShardingAndApNode(loader); loader.testConnection(changeItemList); } } catch (Exception e) { @@ -413,16 +414,24 @@ private static ConfigInitializer loadConfig(RawJson userConfig, RawJson dbConfig } } - private static void syncShardingNode(ConfigInitializer loader) { + private static void syncShardingAndApNode(ConfigInitializer loader) { + // sync schema_exists(only testConn can update schema_exists) Map oldShardingNodeMap = DbleServer.getInstance().getConfig().getShardingNodes(); Map newShardingNodeMap = loader.getShardingNodes(); for (Map.Entry shardingNodeEntry : newShardingNodeMap.entrySet()) { - //sync schema_exists,only testConn can update schema_exists if (oldShardingNodeMap.containsKey(shardingNodeEntry.getKey())) { shardingNodeEntry.getValue().setSchemaExists(oldShardingNodeMap.get(shardingNodeEntry.getKey()).isSchemaExists()); } } + Map oldApNodeMap = DbleServer.getInstance().getConfig().getApNodes(); + Map newApNodeMap = loader.getApNodes(); + for (Map.Entry apNodeEntry : newApNodeMap.entrySet()) { + if (oldApNodeMap.containsKey(apNodeEntry.getKey())) { + apNodeEntry.getValue().setSchemaExists(oldApNodeMap.get(apNodeEntry.getKey()).isSchemaExists()); + } + } + } private static List differentiateChanges(ConfigInitializer newLoader) { @@ -466,6 +475,21 @@ private static List differentiateChanges(ConfigInitializer newLoader return changeItem; }).forEach(changeItemList::add); + //apNode + Map oldApNodeMap = oldServerConfig.getApNodes(); + Map newApNodeMap = newLoader.getApNodes(); + MapDifference apNodeMapDiff = Maps.difference(newApNodeMap, oldApNodeMap); + //delete + apNodeMapDiff.entriesOnlyOnRight().values().stream().map(apNode -> new ChangeItem(ChangeType.DELETE, apNode, ChangeItemType.AP_NODE)).forEach(changeItemList::add); + //add + apNodeMapDiff.entriesOnlyOnLeft().values().stream().map(apNode -> new ChangeItem(ChangeType.ADD, apNode, ChangeItemType.AP_NODE)).forEach(changeItemList::add); + //update + apNodeMapDiff.entriesDiffering().entrySet().stream().map(differenceEntry -> { + ApNode newApNode = differenceEntry.getValue().leftValue(); + ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, newApNode, ChangeItemType.AP_NODE); + return changeItem; + }).forEach(changeItemList::add); + //dbGroup Map oldDbGroupMap = oldServerConfig.getDbGroups(); Map newDbGroupMap = newLoader.getDbGroups(); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowUser.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowUser.java index 1d8a11d3c3..f58642ab3a 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowUser.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowUser.java @@ -79,7 +79,7 @@ public static void execute(ManagerService service) { private static RowDataPacket getRow(UserConfig user, String charset) { RowDataPacket row = new RowDataPacket(FIELD_COUNT); - if (user instanceof ShardingUserConfig) { + if (user instanceof ShardingUserConfig) { // contains HybridTAUserConfig ShardingUserConfig shardingUser = (ShardingUserConfig) user; if (shardingUser.getTenant() != null) { row.add(StringUtil.encode(shardingUser.getName() + ":" + shardingUser.getTenant(), charset)); diff --git a/src/main/resources/sharding.xsd b/src/main/resources/sharding.xsd index 34488577c3..910c9c525f 100644 --- a/src/main/resources/sharding.xsd +++ b/src/main/resources/sharding.xsd @@ -12,6 +12,7 @@ + diff --git a/src/main/resources/sharding_detail.xsd b/src/main/resources/sharding_detail.xsd index 65422f50f1..730fe4b5e4 100644 --- a/src/main/resources/sharding_detail.xsd +++ b/src/main/resources/sharding_detail.xsd @@ -70,6 +70,7 @@ + @@ -79,6 +80,13 @@ + + + + + + + diff --git a/src/main/resources/user.xsd b/src/main/resources/user.xsd index 4a0e4f90f7..52b6c53553 100644 --- a/src/main/resources/user.xsd +++ b/src/main/resources/user.xsd @@ -14,6 +14,7 @@ + diff --git a/src/main/resources/user_detail.xsd b/src/main/resources/user_detail.xsd index 100a3046ff..756f2e1605 100644 --- a/src/main/resources/user_detail.xsd +++ b/src/main/resources/user_detail.xsd @@ -78,6 +78,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/test/java/com/actiontech/dble/cluster/ClusterHelpTest.java b/src/test/java/com/actiontech/dble/cluster/ClusterHelpTest.java index 06031f065b..fab8904771 100644 --- a/src/test/java/com/actiontech/dble/cluster/ClusterHelpTest.java +++ b/src/test/java/com/actiontech/dble/cluster/ClusterHelpTest.java @@ -5,6 +5,7 @@ package com.actiontech.dble.cluster; +import com.actiontech.dble.backend.datasource.ApNode; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; @@ -51,10 +52,12 @@ public void testShardingXml() { Map functionMapByXml = configInitializerByXml.getFunctions(); Map schemaConfigMapByXml = configInitializerByXml.getSchemas(); Map shardingNodeMapByXml = configInitializerByXml.getShardingNodes(); + Map apNodeMapByXml = configInitializerByXml.getApNodes(); Map> erTableSetMapByJson = configInitializerByJson.getErRelations(); Map functionMapByJson = configInitializerByJson.getFunctions(); Map schemaConfigMapByJson = configInitializerByJson.getSchemas(); Map shardingNodeMapByJson = configInitializerByJson.getShardingNodes(); + Map apNodeMapByJson = configInitializerByJson.getApNodes(); Assert.assertEquals(erTableSetMapByXml.size(), erTableSetMapByJson.size()); for (Map.Entry> erTableSetEntry : erTableSetMapByXml.entrySet()) { @@ -75,6 +78,12 @@ public void testShardingXml() { Assert.assertTrue(shardingNodeJson.equalsBaseInfo(shardingNodeConfigEntry.getValue())); } + Assert.assertEquals(apNodeMapByXml.size(), apNodeMapByJson.size()); + for (Map.Entry apNodeConfigEntry : apNodeMapByXml.entrySet()) { + ApNode apNodeJson = apNodeMapByJson.get(apNodeConfigEntry.getKey()); + Assert.assertTrue(apNodeJson.equalsBaseInfo(apNodeConfigEntry.getValue())); + } + Assert.assertEquals(schemaConfigMapByXml.size(), schemaConfigMapByJson.size()); for (Map.Entry schemaConfigEntry : schemaConfigMapByXml.entrySet()) { SchemaConfig schemaConfigJson = schemaConfigMapByJson.get(schemaConfigEntry.getKey()); @@ -134,7 +143,9 @@ public void testUserXml() { for (Map.Entry userConfigEntry : users.entrySet()) { UserConfig userConfig = userConfigMap.get(userConfigEntry.getKey()); - if (userConfig instanceof ShardingUserConfig) { + if (userConfig instanceof HybridTAUserConfig) { + Assert.assertTrue(((HybridTAUserConfig) userConfigEntry.getValue()).equalsBaseInfo((HybridTAUserConfig) userConfig)); + } else if (userConfig instanceof ShardingUserConfig) { Assert.assertTrue(((ShardingUserConfig) userConfigEntry.getValue()).equalsBaseInfo((ShardingUserConfig) userConfig)); } else if (userConfig instanceof RwSplitUserConfig) { Assert.assertTrue(((RwSplitUserConfig) userConfigEntry.getValue()).equalsBaseInfo((RwSplitUserConfig) userConfig)); @@ -146,7 +157,9 @@ public void testUserXml() { } for (Map.Entry userConfigEntry : userConfigMap.entrySet()) { UserConfig userConfig = users.get(userConfigEntry.getKey()); - if (userConfig instanceof ShardingUserConfig) { + if (userConfig instanceof HybridTAUserConfig) { + Assert.assertTrue(((HybridTAUserConfig) userConfigEntry.getValue()).equalsBaseInfo((HybridTAUserConfig) userConfig)); + } else if (userConfig instanceof ShardingUserConfig) { Assert.assertTrue(((ShardingUserConfig) userConfigEntry.getValue()).equalsBaseInfo((ShardingUserConfig) userConfig)); } else if (userConfig instanceof RwSplitUserConfig) { Assert.assertTrue(((RwSplitUserConfig) userConfigEntry.getValue()).equalsBaseInfo((RwSplitUserConfig) userConfig));