Skip to content

Commit

Permalink
Merge pull request #3731 from actiontech/inner-2218
Browse files Browse the repository at this point in the history
[inner-2218] Configuration support for HTAP
  • Loading branch information
wenyh1 authored Jun 29, 2023
2 parents da4593e + 1c9b6dd commit 409823f
Show file tree
Hide file tree
Showing 34 changed files with 552 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.actiontech.dble.backend.datasource;

public class ApNode extends ShardingNode {

public ApNode(String dbGroupName, String hostName, String database, PhysicalDbGroup dbGroup) {
super(dbGroupName, hostName, database, dbGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class PhysicalDbGroup {
private boolean shardingUseless = true;
private boolean rwSplitUseless = true;
private boolean analysisUseless = true;
private boolean hybridTAUseless = true;
private Set<Session> rwSplitSessionSet = Sets.newConcurrentHashSet();
private volatile Integer state = Integer.valueOf(INITIAL);

Expand Down Expand Up @@ -164,7 +165,7 @@ public int getRwSplitMode() {
}

public boolean isUseless() {
return shardingUseless && rwSplitUseless && analysisUseless;
return shardingUseless && rwSplitUseless && analysisUseless && hybridTAUseless;
}

public boolean isShardingUseless() {
Expand All @@ -191,12 +192,19 @@ public void setAnalysisUseless(boolean analysisUseless) {
this.analysisUseless = analysisUseless;
}

public boolean isHybridTAUseless() {
return hybridTAUseless;
}

public void setHybridTAUseless(boolean hybridTAUseless) {
this.hybridTAUseless = hybridTAUseless;
}

private boolean checkSlaveSynStatus(PhysicalDbInstance ds) {
return (dbGroupConfig.getDelayThreshold() != -1 &&
dbGroupConfig.isShowSlaveSql()) || ds.getDbGroup().isDelayDetectionStart();
}


public PhysicalDbInstance getWriteDbInstance() {
return writeDbInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.actiontech.dble.cluster.zkprocess.entity.sharding.function.Function;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Schema;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Table;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ApNode;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ShardingNode;
import com.actiontech.dble.cluster.zkprocess.entity.user.BlackList;
import com.actiontech.dble.cluster.zkprocess.entity.user.User;
Expand Down Expand Up @@ -423,6 +424,12 @@ public Shardings parseShardingJsonToBean(Gson gson, RawJson jsonContent) {
shardingBean.setShardingNode(shardingNodeList);
}

JsonElement apNodeJson = jsonObject.get(ClusterPathUtil.AP_NODE);
if (apNodeJson != null) {
List<ApNode> apNodeList = gson.fromJson(apNodeJson.toString(), new TypeToken<List<ApNode>>() {
}.getType());
shardingBean.setApNode(apNodeList);
}
JsonElement functionJson = jsonObject.get(ClusterPathUtil.FUNCTION);
if (functionJson != null) {
List<Function> functions = gson.fromJson(functionJson.toString(), new TypeToken<List<Function>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private ClusterPathUtil() {
public static final String SCHEMA = "schema";
public static final String DB_GROUP = "dbGroup";
public static final String SHARDING_NODE = "shardingNode";
public static final String AP_NODE = "apNode";
public static final String BLACKLIST = "blacklist";
public static final String VERSION = "version";
public static final String FUNCTION = "function";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.actiontech.dble.cluster.zkprocess.entity.sharding.function.Function;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.schema.Schema;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ApNode;
import com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode.ShardingNode;
import com.actiontech.dble.config.Versions;

Expand All @@ -27,6 +28,8 @@ public class Shardings {

private List<ShardingNode> shardingNode;

private List<ApNode> apNode;

protected List<Function> function;

public List<Schema> getSchema() {
Expand All @@ -51,6 +54,16 @@ public void setShardingNode(List<ShardingNode> shardingNode) {
this.shardingNode = shardingNode;
}

public List<ApNode> getApNode() {
if (this.apNode == null) {
apNode = new ArrayList<>();
}
return apNode;
}

public void setApNode(List<ApNode> apNode) {
this.apNode = apNode;
}

public List<Function> getFunction() {
if (this.function == null) {
Expand All @@ -77,6 +90,8 @@ public String toString() {
schema +
", shardingNode=" +
shardingNode +
", apNode=" +
apNode +
", function=" +
function +
"]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public class Users {
@XmlElementRef(name = "ShardingUser", type = ShardingUser.class),
@XmlElementRef(name = "ManagerUser", type = ManagerUser.class),
@XmlElementRef(name = "RwSplitUser", type = RwSplitUser.class),
@XmlElementRef(name = "AnalysisUser", type = AnalysisUser.class)
@XmlElementRef(name = "AnalysisUser", type = AnalysisUser.class),
@XmlElementRef(name = "HybridTAUser", type = HybridTAUser.class)
})
protected List<Object> user;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class Schema implements Named {
@XmlAttribute
protected String shardingNode;

@XmlAttribute
protected String apNode;

@XmlAttribute
protected String function;

Expand Down Expand Up @@ -80,6 +83,14 @@ public void setFunction(String function) {
this.function = function;
}

public void setApNode(String apNode) {
this.apNode = apNode;
}

public String getApNode() {
return apNode;
}

public boolean isLogicalCreateADrop() {
return logicalCreateADrop;
}
Expand All @@ -92,6 +103,8 @@ public String toString() {
sqlMaxLimit +
", shardingNode=" +
shardingNode +
", apNode=" +
apNode +
", function=" +
function +
", table=" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.actiontech.dble.cluster.zkprocess.entity.sharding.shardingnode;

import com.actiontech.dble.cluster.zkprocess.entity.Named;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlType;

@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name = "apNode")
public class ApNode implements Named {

@XmlAttribute(required = true)
private String name;

@XmlAttribute(required = true)
private String dbGroup;

@XmlAttribute(required = true)
private String database;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDbGroup() {
return dbGroup;
}

public void setDbGroup(String dbGroup) {
this.dbGroup = dbGroup;
}

public String getDatabase() {
return database;
}

public void setDatabase(String database) {
this.database = database;
}

@Override
public String toString() {
String builder = "ApNode [name=" +
name +
", dbGroup=" +
dbGroup +
", database=" +
database +
"]";
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.actiontech.dble.cluster.zkprocess.entity.user;

import javax.xml.bind.annotation.*;

@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name = "hybridTAUser")
@XmlRootElement
public class HybridTAUser extends ShardingUser {

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("HybridTAUser{").append(super.toString());
sb.append(", schemas=").append(schemas);
sb.append(", tenant=").append(tenant);
sb.append(", readOnly=").append(readOnly);
sb.append(", blacklist=").append(blacklist);

if (privileges != null) {
sb.append(", privileges=").append(privileges);
}
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import javax.xml.bind.annotation.XmlTransient;

@XmlTransient
@XmlSeeAlso({ShardingUser.class, ManagerUser.class, RwSplitUser.class, AnalysisUser.class})
@XmlSeeAlso({ShardingUser.class, ManagerUser.class, RwSplitUser.class, AnalysisUser.class, HybridTAUser.class})
public class User {

@XmlAttribute(required = true)
Expand Down
44 changes: 39 additions & 5 deletions src/main/java/com/actiontech/dble/config/ConfigInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.actiontech.dble.config;

import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
Expand All @@ -21,10 +22,7 @@
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.ERTable;
import com.actiontech.dble.config.model.user.AnalysisUserConfig;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.config.model.user.*;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.plan.common.ptr.BoolPtr;
Expand Down Expand Up @@ -53,6 +51,7 @@ public class ConfigInitializer implements ProblemReporter {
private volatile Map<UserName, UserConfig> users;
private volatile Map<String, SchemaConfig> schemas = Maps.newHashMap();
private volatile Map<String, ShardingNode> shardingNodes = Maps.newHashMap();
private volatile Map<String, ApNode> apNodes = Maps.newHashMap();
private volatile Map<String, PhysicalDbGroup> dbGroups;
private volatile Map<ERTable, Set<ERTable>> erRelations = Maps.newHashMap();
private volatile Map<String, Set<ERTable>> funcNodeERMap = Maps.newHashMap();
Expand Down Expand Up @@ -147,12 +146,14 @@ private void init(RawJson userJson, RawJson dbJson, @Nullable RawJson shardingJs
this.funcNodeERMap = shardingConverter.getFuncNodeERMap();
this.shardingNodes = shardingConverter.getShardingNodeMap();
this.functions = shardingConverter.getFunctionMap();
this.apNodes = shardingConverter.getApNodeMap();
}
this.shardingConfig = shardingJson;

this.sequenceConfig = sequenceJson;
checkRwSplitDbGroup();
checkAnalysisDbGroup();
checkHybridTADbGroup();
checkWriteDbInstance();
}

Expand Down Expand Up @@ -240,6 +241,20 @@ private void checkRwSplitDbGroup() {
}
}

private void checkHybridTADbGroup() {
dbGroups.values().stream().filter(f -> !f.isHybridTAUseless()).forEach(g -> {
if (g.getDbGroupConfig().instanceDatabaseType() != DataBaseType.CLICKHOUSE) {
throw new ConfigException("The dbGroup[" + g.getGroupName() + "] database type must be " + DataBaseType.CLICKHOUSE);
} else {
g.getAllDbInstanceMap().values().stream().forEach(i -> {
if (i.getConfig().getDataBaseType() != DataBaseType.CLICKHOUSE) {
throw new ConfigException("The dbInstance[" + g.getGroupName() + "." + i.getName() + "] all dbInstance database type must be " + DataBaseType.CLICKHOUSE);
}
});
}
});
}

public void testConnection(List<ChangeItem> changeItemList) {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection");
try {
Expand Down Expand Up @@ -280,6 +295,14 @@ public void testConnection(List<ChangeItem> changeItemList) {
ShardingNode shardingNode = (ShardingNode) item;
dbGroup = shardingNode.getDbGroup();

boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames);
if (!isDbInstanceConnected) {
isAllDbInstanceConnected = false;
}
} else if (itemType == ChangeItemType.AP_NODE) {
ApNode apNode = (ApNode) item;
dbGroup = apNode.getDbGroup();

boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames);
if (!isDbInstanceConnected) {
isAllDbInstanceConnected = false;
Expand All @@ -303,6 +326,14 @@ public void testConnection(List<ChangeItem> changeItemList) {
ShardingNode shardingNode = (ShardingNode) item;
dbGroup = shardingNode.getDbGroup();

boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames);
if (!isDbInstanceConnected) {
isAllDbInstanceConnected = false;
}
} else if (itemType == ChangeItemType.AP_NODE) {
ApNode apNode = (ApNode) item;
dbGroup = apNode.getDbGroup();

boolean isDbInstanceConnected = testDbGroup(dbGroup, hostSchemaMap, dbGroupTested, errDbInstanceNames);
if (!isDbInstanceConnected) {
isAllDbInstanceConnected = false;
Expand All @@ -329,7 +360,6 @@ public void testConnection(List<ChangeItem> changeItemList) {
}
}


public void testConnection() {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection");
try {
Expand Down Expand Up @@ -518,6 +548,10 @@ public Map<String, ShardingNode> getShardingNodes() {
return shardingNodes;
}

public Map<String, ApNode> getApNodes() {
return apNodes;
}

public Map<String, PhysicalDbGroup> getDbGroups() {
return this.dbGroups;
}
Expand Down
Loading

0 comments on commit 409823f

Please sign in to comment.