Skip to content

Commit

Permalink
inner-2383:support delete&&support cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
dcy10000 committed Apr 28, 2024
1 parent 4734ded commit c5dabda
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/actiontech/dble/cluster/ClusterLogic.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public static void dbGroupResponseEvent(String value, String dbGroupName) throws
}
}

public static void reloadConfigEvent(String value, String params) throws Exception {
public static void reloadConfigEvent(String value, String params, ReloadContext reloadContext) throws Exception {
try {
ClusterDelayProvider.delayBeforeSlaveReload();
LOGGER.info("reload_all " + ClusterPathUtil.getConfStatusOperatorPath() + " " + value);
Expand All @@ -284,7 +284,7 @@ public static void reloadConfigEvent(String value, String params) throws Excepti
return;
}
try {
boolean result = ReloadConfig.reloadAll(Integer.parseInt(params), new ReloadContext());
boolean result = ReloadConfig.reloadAll(Integer.parseInt(params), reloadContext);
if (!checkLocalResult(result)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.actiontech.dble.cluster.general.listener.ClusterClearKeyListener;
import com.actiontech.dble.cluster.values.ConfStatus;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -54,12 +56,17 @@ public void notifyProcess(KvBean configValue) throws Exception {

//step 1 check if the change is from itself
ConfStatus status = new ConfStatus(value);
ReloadContext reloadContext = new ReloadContext();
if (status.getExtraInfo() != null) {
Gson gson = new Gson();
reloadContext = gson.fromJson(status.getExtraInfo(), ReloadContext.class);
}
if (status.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
//self node
return;
}
//step 2reload the config and set the self config status
ClusterLogic.reloadConfigEvent(value, status.getParams());
ClusterLogic.reloadConfigEvent(value, status.getParams(), reloadContext);
}


Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/actiontech/dble/cluster/values/ConfStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.actiontech.dble.cluster.values;

import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;

/**
* Created by huqing.yan on 2017/7/10.
*/
Expand Down Expand Up @@ -35,8 +37,14 @@ public ConfStatus(String info) {
this.status = Status.valueOf(infoDetail[1]);
if (infoDetail.length == 3)
this.params = infoDetail[2];
else
else if (infoDetail.length == 4) {
this.params = infoDetail[2];
this.extraInfo = new String(Base64.decode(infoDetail[3]));
} else {
this.params = null;
this.extraInfo = null;
}

}

public ConfStatus(Status status, String extraInfo) {
Expand All @@ -56,6 +64,10 @@ public String toString() {
if (params != null) {
ss.append(split);
ss.append(params);
if (extraInfo != null) {
ss.append(split);
ss.append(Base64.encode(extraInfo.getBytes()));
}
}

return ss.toString();
Expand Down Expand Up @@ -86,4 +98,8 @@ public String getParams() {
public String getExtraInfo() {
return extraInfo;
}

public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.actiontech.dble.cluster.values.ConfStatus;
import com.actiontech.dble.cluster.zkprocess.comm.NotifyService;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.google.gson.Gson;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
Expand Down Expand Up @@ -61,6 +63,11 @@ private void executeStatusChange(ChildData childData) throws Exception {
String value = new String(childData.getData(), StandardCharsets.UTF_8);

ConfStatus status = new ConfStatus(value);
ReloadContext reloadContext = new ReloadContext();
if (status.getExtraInfo() != null) {
Gson gson = new Gson();
reloadContext = gson.fromJson(status.getExtraInfo(), ReloadContext.class);
}
if (status.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
return; //self node
}
Expand All @@ -72,6 +79,6 @@ private void executeStatusChange(ChildData childData) throws Exception {
LOGGER.warn("ConfigStatusListener notify error :" + service + " ,Exception info:", e);
}
}
ClusterLogic.reloadConfigEvent(value, status.getParams());
ClusterLogic.reloadConfigEvent(value, status.getParams(), reloadContext);
}
}
35 changes: 23 additions & 12 deletions src/main/java/com/actiontech/dble/config/ConfigInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.cluster.values.ConfStatus;
import com.actiontech.dble.config.helper.TestSchemasTask;
import com.actiontech.dble.config.helper.TestTask;
import com.actiontech.dble.config.loader.xml.XMLDbLoader;
Expand Down Expand Up @@ -203,11 +204,17 @@ public void testConnection() {
boolean skipTestConnectionOnUpdate = false;
if (SystemConfig.getInstance().isSkipTestConOnUpdate()) {
if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) {
boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless());

//not support for sharding db group
if (!useSharding) {
if (reloadContext.getConfStatus() == ConfStatus.Status.MANAGER_DELETE) {
skipTestConnectionOnUpdate = true;
LOGGER.info("will skip all test connection.");
} else {
boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless());

//not support for sharding db group
if (!useSharding) {
skipTestConnectionOnUpdate = true;
LOGGER.info("will skip test connection exclude self.");
}
}
}
}
Expand All @@ -223,14 +230,6 @@ public void testConnection() {
}

for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) {
if (skipTestConnectionOnUpdate) {
String finalDbGroupName = dbGroupName;
boolean find = reloadContext.getAffectDbInstanceList().stream().anyMatch((ele) -> ele.getGroupName().equals(finalDbGroupName) && ele.getInstanceName().equals(ds.getName()));
if (!find) {
//skip test connection on this dbInstance
continue;
}
}
if (ds.getConfig().isDisabled()) {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is disabled"));
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it");
Expand All @@ -242,6 +241,18 @@ public void testConnection() {
ds.setTestConnSuccess(false);
continue;
}
if (skipTestConnectionOnUpdate) {
String finalDbGroupName = dbGroupName;
boolean find = reloadContext.getAffectDbInstanceList().stream().anyMatch((ele) -> ele.getGroupName().equals(finalDbGroupName) && ele.getInstanceName().equals(ds.getName()));
if (!find) {
//skip test connection on this dbInstance
ds.setTestConnSuccess(true);
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] instance " + ds.getName() + " is skiped,because of option skipTestConOnUpdate");
continue;
} else {

}
}
if (!testDbInstance(dbGroupName, ds, schemaList)) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.actiontech.dble.services.manager.information.ManagerSchemaInfo;
import com.actiontech.dble.services.manager.information.ManagerTableUtil;
import com.actiontech.dble.services.manager.information.ManagerWritableTable;
import com.actiontech.dble.services.manager.information.tables.DbleDbInstance;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.actiontech.dble.services.manager.response.UniqueDbInstance;
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource;
import com.alibaba.druid.sql.ast.statement.SQLTableSource;
Expand Down Expand Up @@ -101,7 +104,16 @@ public void handle(String stmt, ManagerService service) {
Set<LinkedHashMap<String, String>> affectPks = ManagerTableUtil.getAffectPks(service, managerTable, foundRows, null);
rowSize = managerTable.deleteRows(affectPks);
if (rowSize != 0) {
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_DELETE, managerTable.getTableName()));
ReloadContext reloadContext = new ReloadContext();
reloadContext.setConfStatus(ConfStatus.Status.MANAGER_DELETE);
if (managerTable instanceof DbleDbInstance) {
for (LinkedHashMap<String, String> affectPk : affectPks) {
String instanceName = affectPk.get("name");
String dbGroup = affectPk.get("db_group");
reloadContext.addAffectDbInstance(new UniqueDbInstance(dbGroup, instanceName));
}
}
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_DELETE, managerTable.getTableName()), reloadContext);
}
} catch (SQLException e) {
isSuccess = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import com.actiontech.dble.services.manager.information.ManagerBaseTable;
import com.actiontech.dble.services.manager.information.ManagerSchemaInfo;
import com.actiontech.dble.services.manager.information.ManagerWritableTable;
import com.actiontech.dble.services.manager.information.tables.DbleDbInstance;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.actiontech.dble.services.manager.response.UniqueDbInstance;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLNullExpr;
Expand Down Expand Up @@ -67,7 +70,16 @@ public void handle(String stmt, ManagerService service) {
managerTable.checkPrimaryKeyDuplicate(rows);
rowSize = managerTable.insertRows(rows);
if (rowSize != 0) {
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_INSERT, managerTable.getTableName()));
ReloadContext reloadContext = new ReloadContext();
reloadContext.setConfStatus(ConfStatus.Status.MANAGER_INSERT);
if (managerTable instanceof DbleDbInstance) {
for (LinkedHashMap<String, String> affectPk : rows) {
String instanceName = affectPk.get("name");
String dbGroup = affectPk.get("db_group");
reloadContext.addAffectDbInstance(new UniqueDbInstance(dbGroup, instanceName));
}
}
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_INSERT, managerTable.getTableName()), reloadContext);
}
managerTable.afterExecute();
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ private int updateRows(ManagerService service, ManagerWritableTable managerTable
if (rowSize != 0) {

ReloadContext reloadContext = new ReloadContext();
reloadContext.setConfStatus(ConfStatus.Status.MANAGER_UPDATE);
if (managerTable instanceof DbleDbInstance) {
for (LinkedHashMap<String, String> affectPk : affectPks) {
String instanceName = affectPk.get("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.actiontech.dble.singleton.CronScheduler;
import com.actiontech.dble.singleton.FrontendUserManager;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,6 +123,10 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, b
//step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload
ConfStatus status = new ConfStatus(SystemConfig.getInstance().getInstanceName(),
ConfStatus.Status.RELOAD_ALL, String.valueOf(loadAllMode));
if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) {
Gson gson = new Gson();
status.setExtraInfo(gson.toJson(reloadContext));
}
ClusterHelper.setKV(ClusterPathUtil.getConfStatusOperatorPath(), status.toString());
ReloadLogHelper.info("reload config: sent config status to cluster center", LOGGER);
//step 5 start a loop to check if all the dble in cluster is reload finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@

package com.actiontech.dble.services.manager.response;

import com.actiontech.dble.cluster.values.ConfStatus;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ReloadContext {
private final List<UniqueDbInstance> affectDbInstanceList = new ArrayList<>();

private ConfStatus.Status confStatus = null;


public ConfStatus.Status getConfStatus() {
return confStatus;
}

public void setConfStatus(ConfStatus.Status confStatus) {
this.confStatus = confStatus;
}

public List<UniqueDbInstance> getAffectDbInstanceList() {
return Collections.unmodifiableList(affectDbInstanceList);
}
Expand Down

0 comments on commit c5dabda

Please sign in to comment.