Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inner-2383-supplement:support delete&&support cluster #3873

Merged
merged 2 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/com/actiontech/dble/cluster/ClusterLogic.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public static void dbGroupResponseEvent(String value, String dbGroupName) throws
}
}

public static void reloadConfigEvent(String value, String params) throws Exception {
public static void reloadConfigEvent(String value, String params, ReloadContext reloadContext) throws Exception {
try {
ClusterDelayProvider.delayBeforeSlaveReload();
LOGGER.info("reload_all " + ClusterPathUtil.getConfStatusOperatorPath() + " " + value);
Expand All @@ -284,7 +284,7 @@ public static void reloadConfigEvent(String value, String params) throws Excepti
return;
}
try {
boolean result = ReloadConfig.reloadAll(Integer.parseInt(params), new ReloadContext());
boolean result = ReloadConfig.reloadAll(Integer.parseInt(params), reloadContext);
if (!checkLocalResult(result)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.actiontech.dble.cluster.general.listener.ClusterClearKeyListener;
import com.actiontech.dble.cluster.values.ConfStatus;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -54,12 +56,17 @@ public void notifyProcess(KvBean configValue) throws Exception {

//step 1 check if the change is from itself
ConfStatus status = new ConfStatus(value);
ReloadContext reloadContext = new ReloadContext();
if (status.getExtraInfo() != null) {
Gson gson = new Gson();
reloadContext = gson.fromJson(status.getExtraInfo(), ReloadContext.class);
}
if (status.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
//self node
return;
}
//step 2reload the config and set the self config status
ClusterLogic.reloadConfigEvent(value, status.getParams());
ClusterLogic.reloadConfigEvent(value, status.getParams(), reloadContext);
}


Expand Down
19 changes: 18 additions & 1 deletion src/main/java/com/actiontech/dble/cluster/values/ConfStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package com.actiontech.dble.cluster.values;


import java.util.Base64;

/**
* Created by huqing.yan on 2017/7/10.
*/
Expand Down Expand Up @@ -35,8 +38,14 @@ public ConfStatus(String info) {
this.status = Status.valueOf(infoDetail[1]);
if (infoDetail.length == 3)
this.params = infoDetail[2];
else
else if (infoDetail.length == 4) {
this.params = infoDetail[2];
this.extraInfo = new String(Base64.getDecoder().decode(infoDetail[3]));
} else {
this.params = null;
this.extraInfo = null;
}

}

public ConfStatus(Status status, String extraInfo) {
Expand All @@ -56,6 +65,10 @@ public String toString() {
if (params != null) {
ss.append(split);
ss.append(params);
if (extraInfo != null) {
ss.append(split);
ss.append((Base64.getEncoder().encodeToString(extraInfo.getBytes())));
}
}

return ss.toString();
Expand Down Expand Up @@ -86,4 +99,8 @@ public String getParams() {
public String getExtraInfo() {
return extraInfo;
}

public void setExtraInfo(String extraInfo) {
this.extraInfo = extraInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.actiontech.dble.cluster.values.ConfStatus;
import com.actiontech.dble.cluster.zkprocess.comm.NotifyService;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.google.gson.Gson;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
Expand Down Expand Up @@ -61,6 +63,11 @@ private void executeStatusChange(ChildData childData) throws Exception {
String value = new String(childData.getData(), StandardCharsets.UTF_8);

ConfStatus status = new ConfStatus(value);
ReloadContext reloadContext = new ReloadContext();
if (status.getExtraInfo() != null) {
Gson gson = new Gson();
reloadContext = gson.fromJson(status.getExtraInfo(), ReloadContext.class);
}
if (status.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
return; //self node
}
Expand All @@ -72,6 +79,6 @@ private void executeStatusChange(ChildData childData) throws Exception {
LOGGER.warn("ConfigStatusListener notify error :" + service + " ,Exception info:", e);
}
}
ClusterLogic.reloadConfigEvent(value, status.getParams());
ClusterLogic.reloadConfigEvent(value, status.getParams(), reloadContext);
}
}
35 changes: 23 additions & 12 deletions src/main/java/com/actiontech/dble/config/ConfigInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.cluster.values.ConfStatus;
import com.actiontech.dble.config.helper.TestSchemasTask;
import com.actiontech.dble.config.helper.TestTask;
import com.actiontech.dble.config.loader.xml.XMLDbLoader;
Expand Down Expand Up @@ -203,11 +204,17 @@ public void testConnection() {
boolean skipTestConnectionOnUpdate = false;
if (SystemConfig.getInstance().isSkipTestConOnUpdate()) {
if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) {
boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless());

//not support for sharding db group
if (!useSharding) {
if (reloadContext.getConfStatus() == ConfStatus.Status.MANAGER_DELETE) {
skipTestConnectionOnUpdate = true;
LOGGER.info("will skip all test connection.");
} else {
boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless());

//not support for sharding db group
if (!useSharding) {
skipTestConnectionOnUpdate = true;
LOGGER.info("will skip test connection exclude self.");
}
}
}
}
Expand All @@ -223,14 +230,6 @@ public void testConnection() {
}

for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) {
if (skipTestConnectionOnUpdate) {
String finalDbGroupName = dbGroupName;
boolean find = reloadContext.getAffectDbInstanceList().stream().anyMatch((ele) -> ele.getGroupName().equals(finalDbGroupName) && ele.getInstanceName().equals(ds.getName()));
if (!find) {
//skip test connection on this dbInstance
continue;
}
}
if (ds.getConfig().isDisabled()) {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is disabled"));
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it");
Expand All @@ -242,6 +241,18 @@ public void testConnection() {
ds.setTestConnSuccess(false);
continue;
}
if (skipTestConnectionOnUpdate) {
String finalDbGroupName = dbGroupName;
boolean find = reloadContext.getAffectDbInstanceList().stream().anyMatch((ele) -> ele.getGroupName().equals(finalDbGroupName) && ele.getInstanceName().equals(ds.getName()));
if (!find) {
//skip test connection on this dbInstance
ds.setTestConnSuccess(true);
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] instance " + ds.getName() + " is skiped,because of option skipTestConOnUpdate");
continue;
} else {

}
}
if (!testDbInstance(dbGroupName, ds, schemaList)) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import com.actiontech.dble.services.manager.information.ManagerSchemaInfo;
import com.actiontech.dble.services.manager.information.ManagerTableUtil;
import com.actiontech.dble.services.manager.information.ManagerWritableTable;
import com.actiontech.dble.services.manager.information.tables.DbleDbInstance;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.actiontech.dble.services.manager.response.UniqueDbInstance;
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource;
import com.alibaba.druid.sql.ast.statement.SQLTableSource;
Expand Down Expand Up @@ -101,7 +104,16 @@ public void handle(String stmt, ManagerService service) {
Set<LinkedHashMap<String, String>> affectPks = ManagerTableUtil.getAffectPks(service, managerTable, foundRows, null);
rowSize = managerTable.deleteRows(affectPks);
if (rowSize != 0) {
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_DELETE, managerTable.getTableName()));
ReloadContext reloadContext = new ReloadContext();
reloadContext.setConfStatus(ConfStatus.Status.MANAGER_DELETE);
if (managerTable instanceof DbleDbInstance) {
for (LinkedHashMap<String, String> affectPk : affectPks) {
String instanceName = affectPk.get("name");
String dbGroup = affectPk.get("db_group");
reloadContext.addAffectDbInstance(new UniqueDbInstance(dbGroup, instanceName));
}
}
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_DELETE, managerTable.getTableName()), reloadContext);
}
} catch (SQLException e) {
isSuccess = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import com.actiontech.dble.services.manager.information.ManagerBaseTable;
import com.actiontech.dble.services.manager.information.ManagerSchemaInfo;
import com.actiontech.dble.services.manager.information.ManagerWritableTable;
import com.actiontech.dble.services.manager.information.tables.DbleDbInstance;
import com.actiontech.dble.services.manager.response.ReloadConfig;
import com.actiontech.dble.services.manager.response.ReloadContext;
import com.actiontech.dble.services.manager.response.UniqueDbInstance;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLNullExpr;
Expand Down Expand Up @@ -67,7 +70,16 @@ public void handle(String stmt, ManagerService service) {
managerTable.checkPrimaryKeyDuplicate(rows);
rowSize = managerTable.insertRows(rows);
if (rowSize != 0) {
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_INSERT, managerTable.getTableName()));
ReloadContext reloadContext = new ReloadContext();
reloadContext.setConfStatus(ConfStatus.Status.MANAGER_INSERT);
if (managerTable instanceof DbleDbInstance) {
for (LinkedHashMap<String, String> affectPk : rows) {
String instanceName = affectPk.get("name");
String dbGroup = affectPk.get("db_group");
reloadContext.addAffectDbInstance(new UniqueDbInstance(dbGroup, instanceName));
}
}
ReloadConfig.execute(service, 0, false, new ConfStatus(ConfStatus.Status.MANAGER_INSERT, managerTable.getTableName()), reloadContext);
}
managerTable.afterExecute();
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ private int updateRows(ManagerService service, ManagerWritableTable managerTable
if (rowSize != 0) {

ReloadContext reloadContext = new ReloadContext();
reloadContext.setConfStatus(ConfStatus.Status.MANAGER_UPDATE);
if (managerTable instanceof DbleDbInstance) {
for (LinkedHashMap<String, String> affectPk : affectPks) {
String instanceName = affectPk.get("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.actiontech.dble.singleton.CronScheduler;
import com.actiontech.dble.singleton.FrontendUserManager;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,6 +123,10 @@ private static void reloadWithCluster(ManagerService service, int loadAllMode, b
//step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload
ConfStatus status = new ConfStatus(SystemConfig.getInstance().getInstanceName(),
ConfStatus.Status.RELOAD_ALL, String.valueOf(loadAllMode));
if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) {
Gson gson = new Gson();
status.setExtraInfo(gson.toJson(reloadContext));
}
ClusterHelper.setKV(ClusterPathUtil.getConfStatusOperatorPath(), status.toString());
ReloadLogHelper.info("reload config: sent config status to cluster center", LOGGER);
//step 5 start a loop to check if all the dble in cluster is reload finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@

package com.actiontech.dble.services.manager.response;

import com.actiontech.dble.cluster.values.ConfStatus;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ReloadContext {
private final List<UniqueDbInstance> affectDbInstanceList = new ArrayList<>();

private ConfStatus.Status confStatus = null;


public ConfStatus.Status getConfStatus() {
return confStatus;
}

public void setConfStatus(ConfStatus.Status confStatus) {
this.confStatus = confStatus;
}

public List<UniqueDbInstance> getAffectDbInstanceList() {
return Collections.unmodifiableList(affectDbInstanceList);
}
Expand Down
Loading