Skip to content

Commit

Permalink
优化增量对账,减少开销 (#11248)
Browse files Browse the repository at this point in the history
* 优化增量dump,减少开销

* check style fix

* pmd fix

* testcase fix

* testcase fix

* checkstyle fix
  • Loading branch information
shiyiyue1102 authored Dec 18, 2023
1 parent ab2ddac commit 790bb19
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ public class PropertiesConstant {

public static final String SEARCH_WAIT_TIMEOUT = "nacos.config.search.wait_timeout";

public static final String DUMP_CHANGE_ON = "dumpChangeOn";

public static final String DUMP_CHANGE_WORKER_INTERVAL = "dumpChangeWorkerInterval";


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.service.repository.HistoryConfigInfoPersistService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;

import java.sql.Timestamp;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Dump change processor.
Expand Down Expand Up @@ -55,6 +58,11 @@ public DumpChangeConfigWorker(DumpService dumpService, Timestamp startTime) {
public void run() {

try {

if (!PropertyUtil.isDumpChangeOn()) {
LogUtil.DEFAULT_LOG.info("DumpChange task is not open");
return;
}
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
LogUtil.DEFAULT_LOG.info("DumpChange start ,from time {},current time {}", startTime, currentTime);

Expand Down Expand Up @@ -96,28 +104,47 @@ public void run() {
List<ConfigInfoWrapper> changeConfigs = configInfoPersistService.findChangeConfig(startTime,
changeCursorId, pageSize);
for (ConfigInfoWrapper cf : changeConfigs) {
ConfigCacheService.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getEncryptedDataKey());
final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE_UTF8);

LogUtil.DEFAULT_LOG.info("[dump-change-check-ok] {}, {}, length={}, md5={}",
new Object[] {GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(),
content.length(), md5});
final String groupKey = GroupKey2.getKey(cf.getDataId(), cf.getGroup(), cf.getTenant());
//check md5 & localtimestamp update local disk cache.
boolean newLastModified = cf.getLastModified() > ConfigCacheService.getLastModifiedTs(groupKey);
String localContentMd5 = ConfigCacheService.getContentMd5(groupKey);
boolean md5Update = !localContentMd5.equals(cf.getMd5());
if (newLastModified || md5Update) {
LogUtil.DEFAULT_LOG.info("[dump-change] find change config {}, {}, md5={}",
new Object[] {groupKey, cf.getLastModified(), cf.getMd5()});
ConfigInfoWrapper configInfoWrapper = configInfoPersistService.findConfigInfo(cf.getDataId(),
cf.getGroup(), cf.getTenant());
ConfigCacheService.dumpChange(configInfoWrapper.getDataId(), configInfoWrapper.getGroup(),
configInfoWrapper.getTenant(), configInfoWrapper.getContent(),
configInfoWrapper.getLastModified(), configInfoWrapper.getEncryptedDataKey());
final String content = configInfoWrapper.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE_GBK);
final String md5Utf8 = MD5Utils.md5Hex(content, Constants.ENCODE_UTF8);

LogUtil.DEFAULT_LOG.info("[dump-change-ok] {}, {}, length={}, md5={},md5UTF8={}",
new Object[] {groupKey, configInfoWrapper.getLastModified(), content.length(), md5,
md5Utf8});
}
}
if (changeConfigs.size() < pageSize) {
break;
}
changeCursorId = changeConfigs.get(changeConfigs.size() - 1).getId();
}

ConfigCacheService.reloadConfig();
long endChangeConfigTime = System.currentTimeMillis();
LogUtil.DEFAULT_LOG.info("Check changed configs finished,cost:{},set next start time to {}",
LogUtil.DEFAULT_LOG.info(
"Check changed configs finished,cost:{}, next task running will from start time {}",
endChangeConfigTime - startChangeConfigTime, currentTime);
startTime = currentTime;
} catch (Throwable e) {
LogUtil.DEFAULT_LOG.error("Check changed configs error", e);
} finally {
ConfigExecutor.scheduleConfigChangeTask(this, PropertyUtil.getDumpChangeWorkerInterval(),
TimeUnit.MILLISECONDS);
LogUtil.DEFAULT_LOG.info("Next dump change will scheduled after {} millseconds",
PropertyUtil.getDumpChangeWorkerInterval());

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.alibaba.nacos.config.server.utils.GroupKey;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.TimeUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.namespace.repository.NamespacePersistService;
Expand Down Expand Up @@ -274,8 +275,8 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc
};

ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);

long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
Random random = new Random();
long initialDelay = random.nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);

ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
Expand All @@ -285,8 +286,8 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc

ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
ConfigExecutor.scheduleConfigTask(new DumpChangeConfigWorker(this, currentTime), 0,
DUMP_CHANGE_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
ConfigExecutor.scheduleConfigChangeTask(new DumpChangeConfigWorker(this, currentTime),
random.nextInt((int) PropertyUtil.getDumpChangeWorkerInterval()), TimeUnit.MILLISECONDS);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ public boolean process(NacosTask task) {
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
boolean isBatch = dumpTask.isBatch();
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBatch(isBatch).isBeta(isBeta).tag(tag).handleIp(handleIp);
.group(group).isBeta(isBeta).tag(tag).handleIp(handleIp);
String type = "formal";
if (isBeta) {
type = "beta";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1256,16 +1256,18 @@ public List<ConfigInfo> convertDeletedConfig(List<Map<String, Object>> list) {
public List<ConfigInfoWrapper> convertChangeConfig(List<Map<String, Object>> list) {
List<ConfigInfoWrapper> configs = new ArrayList<>();
for (Map<String, Object> map : list) {
Long id = ((java.lang.Number) map.get("id")).longValue();
String dataId = (String) map.get("data_id");
String group = (String) map.get("group_id");
String tenant = (String) map.get("tenant_id");
String content = (String) map.get("content");
String md5 = (String) map.get("md5");
long mTime = ((Timestamp) map.get("gmt_modified")).getTime();
ConfigInfoWrapper config = new ConfigInfoWrapper();
config.setId(id);
config.setDataId(dataId);
config.setGroup(group);
config.setMd5(md5);
config.setTenant(tenant);
config.setContent(content);
config.setLastModified(mTime);
configs.add(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,8 @@ public List<String> selectTagByConfig(String dataId, String group, String tenant
.select(Collections.singletonList("tag_name"), Arrays.asList("data_id", "group_id", "tenant_id"));
try {
return jt.queryForList(sql, new Object[] {dataId, group, tenant}, String.class);
} catch (EmptyResultDataAccessException e) {
return null;
} catch (IncorrectResultSizeDataAccessException e) {
return null;
} catch (CannotGetJdbcConnectionException e) {
Expand Down Expand Up @@ -1385,18 +1387,18 @@ public List<ConfigInfo> convertDeletedConfig(List<Map<String, Object>> list) {
public List<ConfigInfoWrapper> convertChangeConfig(List<Map<String, Object>> list) {
List<ConfigInfoWrapper> configs = new ArrayList<>();
for (Map<String, Object> map : list) {
Long id = (Long) map.get("id");
Long id = ((java.lang.Number) map.get("id")).longValue();
String dataId = (String) map.get("data_id");
String group = (String) map.get("group_id");
String tenant = (String) map.get("tenant_id");
String content = (String) map.get("content");
String md5 = (String) map.get("md5");
long mTime = ((LocalDateTime) map.get("gmt_modified")).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
ConfigInfoWrapper config = new ConfigInfoWrapper();
config.setId(id);
config.setDataId(dataId);
config.setGroup(group);
config.setMd5(md5);
config.setTenant(tenant);
config.setContent(content);
config.setLastModified(mTime);
configs.add(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public static void scheduleConfigTask(Runnable command, long initialDelay, long
TIMER_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

public static void scheduleConfigChangeTask(Runnable command, long delay, TimeUnit unit) {
TIMER_EXECUTOR.schedule(command, delay, unit);
}

public static void scheduleCorrectUsageTask(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
CAPACITY_MANAGEMENT_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,29 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
*/
private static int correctUsageDelay = 10 * 60;

private static boolean dumpChangeOn = true;

/**
* dumpChangeWorkerInterval, default 30 seconds.
*/
private static long dumpChangeWorkerInterval = 30 * 1000L;

public static boolean isDumpChangeOn() {
return dumpChangeOn;
}

public static void setDumpChangeOn(boolean dumpChangeOn) {
PropertyUtil.dumpChangeOn = dumpChangeOn;
}

public static long getDumpChangeWorkerInterval() {
return dumpChangeWorkerInterval;
}

public static void setDumpChangeWorkerInterval(long dumpChangeWorkerInterval) {
PropertyUtil.dumpChangeWorkerInterval = dumpChangeWorkerInterval;
}

public static int getNotifyConnectTimeout() {
return notifyConnectTimeout;
}
Expand Down Expand Up @@ -254,6 +277,9 @@ private void loadSetting() {
setDefaultMaxAggrSize(getInt(PropertiesConstant.DEFAULT_MAX_AGGR_SIZE, defaultMaxAggrSize));
setCorrectUsageDelay(getInt(PropertiesConstant.CORRECT_USAGE_DELAY, correctUsageDelay));
setInitialExpansionPercent(getInt(PropertiesConstant.INITIAL_EXPANSION_PERCENT, initialExpansionPercent));
setDumpChangeOn(getBoolean(PropertiesConstant.DUMP_CHANGE_ON, dumpChangeOn));
setDumpChangeWorkerInterval(
getLong(PropertiesConstant.DUMP_CHANGE_WORKER_INTERVAL, dumpChangeWorkerInterval));
} catch (Exception e) {
LOGGER.error("read application.properties failed", e);
throw e;
Expand All @@ -268,6 +294,10 @@ private int getInt(String key, int defaultValue) {
return Integer.parseInt(getString(key, String.valueOf(defaultValue)));
}

private long getLong(String key, long defaultValue) {
return Long.parseLong(getString(key, String.valueOf(defaultValue)));
}

private String getString(String key, String defaultValue) {
String value = getProperty(key);
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public MapperResult findChangeConfigFetchRows(MapperContext context) {

List<Object> paramList = new ArrayList<>();

final String sqlFetchRows = "SELECT id,data_id,group_id,tenant_id,app_name,content,type,md5,gmt_modified FROM config_info WHERE ";
final String sqlFetchRows = "SELECT id,data_id,group_id,tenant_id,app_name,type,md5,gmt_modified FROM config_info WHERE ";
String where = " 1=1 ";
if (!StringUtils.isBlank(dataId)) {
where += " AND data_id LIKE ? ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ default MapperResult configInfoLikeTenantCount(MapperContext context) {
*/
default MapperResult findChangeConfig(MapperContext context) {
String sql =
"SELECT id, data_id, group_id, tenant_id, app_name, content, gmt_modified, encrypted_data_key FROM config_info WHERE "
"SELECT id, data_id, group_id, tenant_id, app_name,md5, gmt_modified, encrypted_data_key FROM config_info WHERE "
+ "gmt_modified >= ? and id > ? order by id limit ? ";
return new MapperResult(sql, CollectionUtils.list(context.getWhereParameter(FieldConstant.START_TIME),
context.getWhereParameter(FieldConstant.LAST_MAX_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testFindAllConfigInfoFragment() {
public void testFindChangeConfig() {
MapperResult mapperResult = configInfoMapperByMySql.findChangeConfig(context);
Assert.assertEquals(mapperResult.getSql(),
"SELECT id, data_id, group_id, tenant_id, app_name, content, gmt_modified, encrypted_data_key FROM config_info"
"SELECT id, data_id, group_id, tenant_id, app_name,md5, gmt_modified, encrypted_data_key FROM config_info"
+ " WHERE gmt_modified >= ? and id > ? order by id limit ? ");
Assert.assertArrayEquals(mapperResult.getParamList().toArray(), new Object[] {startTime, lastMaxId, pageSize});
}
Expand All @@ -180,7 +180,7 @@ public void testFindChangeConfigFetchRows() {
context.putWhereParameter(FieldConstant.LAST_MAX_ID, lastMaxId);
MapperResult mapperResult = configInfoMapperByMySql.findChangeConfigFetchRows(context);
Assert.assertEquals(mapperResult.getSql(),
"SELECT id,data_id,group_id,tenant_id,app_name,content,type,md5,gmt_modified FROM config_info "
"SELECT id,data_id,group_id,tenant_id,app_name,type,md5,gmt_modified FROM config_info "
+ "WHERE 1=1 AND tenant_id = ? AND app_name = ? AND gmt_modified >=? AND gmt_modified <=? AND id > "
+ lastMaxId + " ORDER BY id ASC LIMIT " + startRow + "," + pageSize);
Assert.assertArrayEquals(mapperResult.getParamList().toArray(),
Expand Down

0 comments on commit 790bb19

Please sign in to comment.