diff --git a/config/src/main/java/com/alibaba/nacos/config/server/constant/PropertiesConstant.java b/config/src/main/java/com/alibaba/nacos/config/server/constant/PropertiesConstant.java index 41a80e4c5aa..f448567daa9 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/constant/PropertiesConstant.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/constant/PropertiesConstant.java @@ -59,4 +59,9 @@ public class PropertiesConstant { public static final String SEARCH_WAIT_TIMEOUT = "nacos.config.search.wait_timeout"; + public static final String DUMP_CHANGE_ON = "dumpChangeOn"; + + public static final String DUMP_CHANGE_WORKER_INTERVAL = "dumpChangeWorkerInterval"; + + } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpChangeConfigWorker.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpChangeConfigWorker.java index 8ca8e952821..694bcf163d7 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpChangeConfigWorker.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpChangeConfigWorker.java @@ -23,11 +23,14 @@ import com.alibaba.nacos.config.server.service.ConfigCacheService; import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService; import com.alibaba.nacos.config.server.service.repository.HistoryConfigInfoPersistService; +import com.alibaba.nacos.config.server.utils.ConfigExecutor; import com.alibaba.nacos.config.server.utils.GroupKey2; import com.alibaba.nacos.config.server.utils.LogUtil; +import com.alibaba.nacos.config.server.utils.PropertyUtil; import java.sql.Timestamp; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Dump change processor. @@ -55,6 +58,11 @@ public DumpChangeConfigWorker(DumpService dumpService, Timestamp startTime) { public void run() { try { + + if (!PropertyUtil.isDumpChangeOn()) { + LogUtil.DEFAULT_LOG.info("DumpChange task is not open"); + return; + } Timestamp currentTime = new Timestamp(System.currentTimeMillis()); LogUtil.DEFAULT_LOG.info("DumpChange start ,from time {},current time {}", startTime, currentTime); @@ -96,14 +104,27 @@ public void run() { List changeConfigs = configInfoPersistService.findChangeConfig(startTime, changeCursorId, pageSize); for (ConfigInfoWrapper cf : changeConfigs) { - ConfigCacheService.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), - cf.getLastModified(), cf.getEncryptedDataKey()); - final String content = cf.getContent(); - final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE_UTF8); - - LogUtil.DEFAULT_LOG.info("[dump-change-check-ok] {}, {}, length={}, md5={}", - new Object[] {GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), - content.length(), md5}); + final String groupKey = GroupKey2.getKey(cf.getDataId(), cf.getGroup(), cf.getTenant()); + //check md5 & localtimestamp update local disk cache. + boolean newLastModified = cf.getLastModified() > ConfigCacheService.getLastModifiedTs(groupKey); + String localContentMd5 = ConfigCacheService.getContentMd5(groupKey); + boolean md5Update = !localContentMd5.equals(cf.getMd5()); + if (newLastModified || md5Update) { + LogUtil.DEFAULT_LOG.info("[dump-change] find change config {}, {}, md5={}", + new Object[] {groupKey, cf.getLastModified(), cf.getMd5()}); + ConfigInfoWrapper configInfoWrapper = configInfoPersistService.findConfigInfo(cf.getDataId(), + cf.getGroup(), cf.getTenant()); + ConfigCacheService.dumpChange(configInfoWrapper.getDataId(), configInfoWrapper.getGroup(), + configInfoWrapper.getTenant(), configInfoWrapper.getContent(), + configInfoWrapper.getLastModified(), configInfoWrapper.getEncryptedDataKey()); + final String content = configInfoWrapper.getContent(); + final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE_GBK); + final String md5Utf8 = MD5Utils.md5Hex(content, Constants.ENCODE_UTF8); + + LogUtil.DEFAULT_LOG.info("[dump-change-ok] {}, {}, length={}, md5={},md5UTF8={}", + new Object[] {groupKey, configInfoWrapper.getLastModified(), content.length(), md5, + md5Utf8}); + } } if (changeConfigs.size() < pageSize) { break; @@ -111,13 +132,19 @@ public void run() { changeCursorId = changeConfigs.get(changeConfigs.size() - 1).getId(); } - ConfigCacheService.reloadConfig(); long endChangeConfigTime = System.currentTimeMillis(); - LogUtil.DEFAULT_LOG.info("Check changed configs finished,cost:{},set next start time to {}", + LogUtil.DEFAULT_LOG.info( + "Check changed configs finished,cost:{}, next task running will from start time {}", endChangeConfigTime - startChangeConfigTime, currentTime); startTime = currentTime; } catch (Throwable e) { LogUtil.DEFAULT_LOG.error("Check changed configs error", e); + } finally { + ConfigExecutor.scheduleConfigChangeTask(this, PropertyUtil.getDumpChangeWorkerInterval(), + TimeUnit.MILLISECONDS); + LogUtil.DEFAULT_LOG.info("Next dump change will scheduled after {} millseconds", + PropertyUtil.getDumpChangeWorkerInterval()); + } } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java index 75ad824dc52..b48a0d6cfe7 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java @@ -45,6 +45,7 @@ import com.alibaba.nacos.config.server.utils.GroupKey; import com.alibaba.nacos.config.server.utils.GroupKey2; import com.alibaba.nacos.config.server.utils.LogUtil; +import com.alibaba.nacos.config.server.utils.PropertyUtil; import com.alibaba.nacos.config.server.utils.TimeUtils; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.namespace.repository.NamespacePersistService; @@ -274,8 +275,8 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc }; ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS); - - long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10; + Random random = new Random(); + long initialDelay = random.nextInt(INITIAL_DELAY_IN_MINUTE) + 10; LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay); ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); @@ -285,8 +286,8 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES); - ConfigExecutor.scheduleConfigTask(new DumpChangeConfigWorker(this, currentTime), 0, - DUMP_CHANGE_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); + ConfigExecutor.scheduleConfigChangeTask(new DumpChangeConfigWorker(this, currentTime), + random.nextInt((int) PropertyUtil.getDumpChangeWorkerInterval()), TimeUnit.MILLISECONDS); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java index 9215bcdb2d3..663ea1ca913 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java @@ -68,9 +68,8 @@ public boolean process(NacosTask task) { String handleIp = dumpTask.getHandleIp(); boolean isBeta = dumpTask.isBeta(); String tag = dumpTask.getTag(); - boolean isBatch = dumpTask.isBatch(); ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId) - .group(group).isBatch(isBatch).isBeta(isBeta).tag(tag).handleIp(handleIp); + .group(group).isBeta(isBeta).tag(tag).handleIp(handleIp); String type = "formal"; if (isBeta) { type = "beta"; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/EmbeddedConfigInfoPersistServiceImpl.java b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/EmbeddedConfigInfoPersistServiceImpl.java index f0ac40b8823..270b5f7497f 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/EmbeddedConfigInfoPersistServiceImpl.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/embedded/EmbeddedConfigInfoPersistServiceImpl.java @@ -1256,16 +1256,18 @@ public List convertDeletedConfig(List> list) { public List convertChangeConfig(List> list) { List configs = new ArrayList<>(); for (Map map : list) { + Long id = ((java.lang.Number) map.get("id")).longValue(); String dataId = (String) map.get("data_id"); String group = (String) map.get("group_id"); String tenant = (String) map.get("tenant_id"); - String content = (String) map.get("content"); + String md5 = (String) map.get("md5"); long mTime = ((Timestamp) map.get("gmt_modified")).getTime(); ConfigInfoWrapper config = new ConfigInfoWrapper(); + config.setId(id); config.setDataId(dataId); config.setGroup(group); + config.setMd5(md5); config.setTenant(tenant); - config.setContent(content); config.setLastModified(mTime); configs.add(config); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalConfigInfoPersistServiceImpl.java b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalConfigInfoPersistServiceImpl.java index 7af9c271656..4793f786377 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalConfigInfoPersistServiceImpl.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/repository/extrnal/ExternalConfigInfoPersistServiceImpl.java @@ -1268,6 +1268,8 @@ public List selectTagByConfig(String dataId, String group, String tenant .select(Collections.singletonList("tag_name"), Arrays.asList("data_id", "group_id", "tenant_id")); try { return jt.queryForList(sql, new Object[] {dataId, group, tenant}, String.class); + } catch (EmptyResultDataAccessException e) { + return null; } catch (IncorrectResultSizeDataAccessException e) { return null; } catch (CannotGetJdbcConnectionException e) { @@ -1385,18 +1387,18 @@ public List convertDeletedConfig(List> list) { public List convertChangeConfig(List> list) { List configs = new ArrayList<>(); for (Map map : list) { - Long id = (Long) map.get("id"); + Long id = ((java.lang.Number) map.get("id")).longValue(); String dataId = (String) map.get("data_id"); String group = (String) map.get("group_id"); String tenant = (String) map.get("tenant_id"); - String content = (String) map.get("content"); + String md5 = (String) map.get("md5"); long mTime = ((LocalDateTime) map.get("gmt_modified")).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); ConfigInfoWrapper config = new ConfigInfoWrapper(); config.setId(id); config.setDataId(dataId); config.setGroup(group); + config.setMd5(md5); config.setTenant(tenant); - config.setContent(content); config.setLastModified(mTime); configs.add(config); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/utils/ConfigExecutor.java b/config/src/main/java/com/alibaba/nacos/config/server/utils/ConfigExecutor.java index f07b0a235be..c06420c6874 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/utils/ConfigExecutor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/utils/ConfigExecutor.java @@ -69,6 +69,10 @@ public static void scheduleConfigTask(Runnable command, long initialDelay, long TIMER_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit); } + public static void scheduleConfigChangeTask(Runnable command, long delay, TimeUnit unit) { + TIMER_EXECUTOR.schedule(command, delay, unit); + } + public static void scheduleCorrectUsageTask(Runnable runnable, long initialDelay, long delay, TimeUnit unit) { CAPACITY_MANAGEMENT_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit); } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java b/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java index 58a236bcfed..fc2671bbeab 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/utils/PropertyUtil.java @@ -93,6 +93,29 @@ public class PropertyUtil implements ApplicationContextInitializer paramList = new ArrayList<>(); - final String sqlFetchRows = "SELECT id,data_id,group_id,tenant_id,app_name,content,type,md5,gmt_modified FROM config_info WHERE "; + final String sqlFetchRows = "SELECT id,data_id,group_id,tenant_id,app_name,type,md5,gmt_modified FROM config_info WHERE "; String where = " 1=1 "; if (!StringUtils.isBlank(dataId)) { where += " AND data_id LIKE ? "; diff --git a/plugin/datasource/src/main/java/com/alibaba/nacos/plugin/datasource/mapper/ConfigInfoMapper.java b/plugin/datasource/src/main/java/com/alibaba/nacos/plugin/datasource/mapper/ConfigInfoMapper.java index 014aab877e3..deb4582c086 100644 --- a/plugin/datasource/src/main/java/com/alibaba/nacos/plugin/datasource/mapper/ConfigInfoMapper.java +++ b/plugin/datasource/src/main/java/com/alibaba/nacos/plugin/datasource/mapper/ConfigInfoMapper.java @@ -149,7 +149,7 @@ default MapperResult configInfoLikeTenantCount(MapperContext context) { */ default MapperResult findChangeConfig(MapperContext context) { String sql = - "SELECT id, data_id, group_id, tenant_id, app_name, content, gmt_modified, encrypted_data_key FROM config_info WHERE " + "SELECT id, data_id, group_id, tenant_id, app_name,md5, gmt_modified, encrypted_data_key FROM config_info WHERE " + "gmt_modified >= ? and id > ? order by id limit ? "; return new MapperResult(sql, CollectionUtils.list(context.getWhereParameter(FieldConstant.START_TIME), context.getWhereParameter(FieldConstant.LAST_MAX_ID), diff --git a/plugin/datasource/src/test/java/com/alibaba/nacos/plugin/datasource/impl/mysql/ConfigInfoMapperByMySqlTest.java b/plugin/datasource/src/test/java/com/alibaba/nacos/plugin/datasource/impl/mysql/ConfigInfoMapperByMySqlTest.java index d97a3507dff..a138a9c8395 100644 --- a/plugin/datasource/src/test/java/com/alibaba/nacos/plugin/datasource/impl/mysql/ConfigInfoMapperByMySqlTest.java +++ b/plugin/datasource/src/test/java/com/alibaba/nacos/plugin/datasource/impl/mysql/ConfigInfoMapperByMySqlTest.java @@ -160,7 +160,7 @@ public void testFindAllConfigInfoFragment() { public void testFindChangeConfig() { MapperResult mapperResult = configInfoMapperByMySql.findChangeConfig(context); Assert.assertEquals(mapperResult.getSql(), - "SELECT id, data_id, group_id, tenant_id, app_name, content, gmt_modified, encrypted_data_key FROM config_info" + "SELECT id, data_id, group_id, tenant_id, app_name,md5, gmt_modified, encrypted_data_key FROM config_info" + " WHERE gmt_modified >= ? and id > ? order by id limit ? "); Assert.assertArrayEquals(mapperResult.getParamList().toArray(), new Object[] {startTime, lastMaxId, pageSize}); } @@ -180,7 +180,7 @@ public void testFindChangeConfigFetchRows() { context.putWhereParameter(FieldConstant.LAST_MAX_ID, lastMaxId); MapperResult mapperResult = configInfoMapperByMySql.findChangeConfigFetchRows(context); Assert.assertEquals(mapperResult.getSql(), - "SELECT id,data_id,group_id,tenant_id,app_name,content,type,md5,gmt_modified FROM config_info " + "SELECT id,data_id,group_id,tenant_id,app_name,type,md5,gmt_modified FROM config_info " + "WHERE 1=1 AND tenant_id = ? AND app_name = ? AND gmt_modified >=? AND gmt_modified <=? AND id > " + lastMaxId + " ORDER BY id ASC LIMIT " + startRow + "," + pageSize); Assert.assertArrayEquals(mapperResult.getParamList().toArray(),