Skip to content

Commit

Permalink
gray mode migrate accerate (#12957)
Browse files Browse the repository at this point in the history
* fix type search on mysql model

* 灰度模型迁移程序并发&迁移不落历史表
  • Loading branch information
shiyiyue1102 authored Dec 17, 2024
1 parent 98c9760 commit 80b44bc
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import com.alibaba.nacos.config.server.service.repository.ConfigInfoBetaPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoGrayPersistService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoTagPersistService;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.persistence.model.Page;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
import static com.alibaba.nacos.config.server.utils.PropertyUtil.GRAY_MIGRATE_FLAG;

/**
* migrate beta and tag to gray model. should only invoked from config sync notify.
Expand Down Expand Up @@ -60,8 +65,10 @@ public ConfigGrayModelMigrateService(ConfigInfoBetaPersistService configInfoBeta
* migrate beta&tag to gray .
*/
@PostConstruct
public void migrate() {
doCheckMigrate();
public void migrate() throws Exception {
if (PropertyUtil.isGrayCompatibleModel()) {
doCheckMigrate();
}
}

/**
Expand All @@ -74,8 +81,8 @@ public void migrate() {
public void checkMigrateBeta(String dataId, String group, String tenant) {
ConfigInfoBetaWrapper configInfo4Beta = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
if (configInfo4Beta == null) {
ConfigInfoGrayWrapper configInfoGrayWrapper = configInfoGrayPersistService.findConfigInfo4Gray(dataId, group,
tenant, BetaGrayRule.TYPE_BETA);
ConfigInfoGrayWrapper configInfoGrayWrapper = configInfoGrayPersistService.findConfigInfo4Gray(dataId,
group, tenant, BetaGrayRule.TYPE_BETA);
if (configInfoGrayWrapper == null) {
return;
}
Expand Down Expand Up @@ -133,7 +140,12 @@ public void checkMigrateTag(String dataId, String group, String tenant, String t
}
}

private void doCheckMigrate() {
private void doCheckMigrate() throws Exception {

ThreadPoolExecutor executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(PropertyUtil.getAllDumpPageSize() * 2),
r -> new Thread(r, "gray migrate worker"), new ThreadPoolExecutor.CallerRunsPolicy());
int pageSize = 100;
int rowCount = configInfoBetaPersistService.configInfoBetaCount();
int pageCount = (int) Math.ceil(rowCount * 1.0 / pageSize);
Expand All @@ -144,21 +156,40 @@ private void doCheckMigrate() {
if (page != null) {
for (ConfigInfoBetaWrapper cf : page.getPageItems()) {

ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), BetaGrayRule.TYPE_BETA);
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate beta to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
BetaGrayRule.TYPE_BETA, BetaGrayRule.VERSION, cf.getBetaIps(), BetaGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, BetaGrayRule.TYPE_BETA,
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
}
executorService.execute(() -> {
GRAY_MIGRATE_FLAG.set(true);
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), BetaGrayRule.TYPE_BETA);
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate beta to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
BetaGrayRule.TYPE_BETA, BetaGrayRule.VERSION, cf.getBetaIps(),
BetaGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, BetaGrayRule.TYPE_BETA,
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
GRAY_MIGRATE_FLAG.set(false);
}
});

}
actualRowCount += page.getPageItems().size();
DEFAULT_LOG.info("[gray-migrate-beta] submit gray task {} / {}", actualRowCount, rowCount);

}
}

try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[gray-migrate-beta] wait {} migrate tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}

} catch (Exception e) {
DEFAULT_LOG.error("[gray-migrate-beta] wait dump tasks to be finished error", e);
throw e;
}

rowCount = configInfoTagPersistService.configInfoTagCount();
Expand All @@ -169,23 +200,46 @@ private void doCheckMigrate() {
pageSize);
if (page != null) {
for (ConfigInfoTagWrapper cf : page.getPageItems()) {
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), TagGrayRule.TYPE_TAG + "_" + cf.getTag());
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate tag to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
TagGrayRule.TYPE_TAG, TagGrayRule.VERSION, cf.getTag(), TagGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, TagGrayRule.TYPE_TAG + "_" + cf.getTag(),
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
}

executorService.execute(() -> {
GRAY_MIGRATE_FLAG.set(true);
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(),
TagGrayRule.TYPE_TAG + "_" + cf.getTag());
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate tag to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
TagGrayRule.TYPE_TAG, TagGrayRule.VERSION, cf.getTag(), TagGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf,
TagGrayRule.TYPE_TAG + "_" + cf.getTag(),
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIP(), "nacos_auto_migrate");
GRAY_MIGRATE_FLAG.set(false);
}
});

}

actualRowCount += page.getPageItems().size();
DEFAULT_LOG.info("[-tag] {} / {}", actualRowCount, rowCount);
DEFAULT_LOG.info("[gray-migrate-tag] submit gray task {} / {}", actualRowCount, rowCount);
}
}

try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[gray-migrate-tag] wait {} migrate tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}

} catch (Exception e) {
DEFAULT_LOG.error("[gray-migrate-tag] wait migrate tasks to be finished error", e);
throw e;
}
//shut down migrate executor
executorService.shutdown();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_GRAY_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_STATE_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.utils.PropertyUtil.GRAY_MIGRATE_FLAG;

/**
* EmbeddedConfigInfoGrayPersistServiceImpl.
Expand Down Expand Up @@ -147,15 +148,16 @@ public ConfigOperateResult addConfigInfo4Gray(ConfigInfo configInfo, String gray
long hisId = idGeneratorManager.nextId(RESOURCE_CONFIG_HISTORY_ID);

addConfigInfoGrayAtomic(configGrayId, configInfo, grayNameTmp, grayRuleTmp, srcIp, srcUser);

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(hisId, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));

if (!GRAY_MIGRATE_FLAG.get()) {
historyConfigInfoPersistService.insertConfigHistoryAtomic(hisId, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));
}

EmbeddedStorageContextUtils.onModifyConfigGrayInfo(configInfo, grayNameTmp, grayRuleTmp, srcIp, now);
databaseOperate.blockUpdate();

return getGrayOperateResult(configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);
} finally {
EmbeddedStorageContextHolder.cleanAllContext();
Expand Down Expand Up @@ -270,13 +272,12 @@ public ConfigOperateResult updateConfigInfo4Gray(ConfigInfo configInfo, String g
Arrays.asList("data_id", "group_id", "tenant_id", "gray_name"));
final Object[] args = new Object[] {configInfo.getContent(), md5, srcIp, srcUser, time, appNameTmp,
grayRuleTmp, configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp};

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, now, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));

if (!GRAY_MIGRATE_FLAG.get()) {
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, time, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));
}
EmbeddedStorageContextUtils.onModifyConfigGrayInfo(configInfo, grayNameTmp, grayRuleTmp, srcIp, time);
EmbeddedStorageContextHolder.addSqlContext(sql, args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_GRAY_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.service.repository.ConfigRowMapperInjector.CONFIG_INFO_STATE_WRAPPER_ROW_MAPPER;
import static com.alibaba.nacos.config.server.utils.PropertyUtil.GRAY_MIGRATE_FLAG;


/**
Expand Down Expand Up @@ -136,11 +137,12 @@ public ConfigOperateResult addConfigInfo4Gray(ConfigInfo configInfo, String gray
try {
addConfigInfoGrayAtomic(-1, configInfo, grayNameTmp, grayRuleTmp, srcIp, srcUser);

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));

if (!GRAY_MIGRATE_FLAG.get()) {
Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, now, "I",
Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(grayNameTmp, grayRuleTmp, srcUser));
}
return getGrayOperateResult(configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);
} catch (Exception e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
Expand Down Expand Up @@ -251,11 +253,12 @@ public ConfigOperateResult updateConfigInfo4Gray(ConfigInfo configInfo, String g
configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);

Timestamp now = new Timestamp(System.currentTimeMillis());
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, now, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));

if (!GRAY_MIGRATE_FLAG.get()) {
historyConfigInfoPersistService.insertConfigHistoryAtomic(oldConfigAllInfo4Gray.getId(),
oldConfigAllInfo4Gray, srcIp, srcUser, now, "U", Constants.GRAY, grayNameTmp,
ConfigExtInfoUtil.getExtInfoFromGrayInfo(oldConfigAllInfo4Gray.getGrayName(),
oldConfigAllInfo4Gray.getGrayRule(), oldConfigAllInfo4Gray.getSrcUser()));
}
return getGrayOperateResult(configInfo.getDataId(), configInfo.getGroup(), tenantTmp, grayNameTmp);
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
*/
private static boolean grayCompatibleModel = true;

public static final ThreadLocal<Boolean> GRAY_MIGRATE_FLAG = ThreadLocal.withInitial(() -> false);

/**
* Whether to enable the limit check function of capacity management, including the upper limit of configuration
* number, configuration content size limit, etc.
Expand Down Expand Up @@ -107,12 +109,12 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
private static int correctUsageDelay = 10 * 60;

private static boolean dumpChangeOn = true;

/**
* The number of days to retain the configuration history, the default is 30 days.
*/
private static int configRententionDays = 30;

/**
* dumpChangeWorkerInterval, default 30 seconds.
*/
Expand Down Expand Up @@ -240,6 +242,7 @@ public static void setDefaultMaxAggrCount(int defaultMaxAggrCount) {

/**
* control whether persist beta and tag to old model.
*
* @return
*/
public static boolean isGrayCompatibleModel() {
Expand All @@ -265,13 +268,13 @@ public static int getCorrectUsageDelay() {
public static void setCorrectUsageDelay(int correctUsageDelay) {
PropertyUtil.correctUsageDelay = correctUsageDelay;
}

public static int getConfigRententionDays() {
return configRententionDays;
}

private void setConfigRententionDays() {
String val = getProperty(PropertiesConstant.CONFIG_RENTENTION_DAYS);
String val = getProperty(PropertiesConstant.CONFIG_RENTENTION_DAYS);
if (null != val) {
int tmp = 0;
try {
Expand All @@ -284,7 +287,7 @@ private void setConfigRententionDays() {
}
}
}

public static boolean isStandaloneMode() {
return EnvUtil.getStandaloneMode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ public class ConfigInfoGrayMapperByMySql extends AbstractMapperByMysql implement

@Override
public MapperResult findAllConfigInfoGrayForDumpAllFetchRows(MapperContext context) {
String sql = " SELECT t.id,data_id,group_id,tenant_id,gray_name,gray_rule,app_name,content,md5,gmt_modified "
+ " FROM ( SELECT id FROM config_info_gray ORDER BY id LIMIT " + context.getStartRow() + ","
+ context.getPageSize() + " ) " + "g, config_info_gray t WHERE g.id = t.id ";
String sql = " SELECT id,data_id,group_id,tenant_id,gray_name,gray_rule,app_name,content,md5,gmt_modified "
+ " FROM config_info_gray ORDER BY id LIMIT " + context.getStartRow() + "," + context.getPageSize();
return new MapperResult(sql, Collections.emptyList());
}

Expand Down

0 comments on commit 80b44bc

Please sign in to comment.