From 05b411e5c7901cbd8739f683e79ff224fdb1dcf3 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 31 Oct 2024 10:56:31 -0700 Subject: [PATCH] [BugFix] Fix loads syncer wrong replication num property (backport #52220) (#52458) Signed-off-by: meegoo Co-authored-by: meegoo --- .../java/com/starrocks/common/Config.java | 6 ++ .../load/loadv2/LoadsHistorySyncer.java | 95 ++++--------------- .../scheduler/history/TableKeeper.java | 2 + 3 files changed, 29 insertions(+), 74 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 36292d73ca6b2..3c554f37fbfd5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -367,6 +367,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int loads_history_sync_interval_second = 60; + /** + * The default retention days of load history. + */ + @ConfField(mutable = true) + public static int loads_history_retained_days = 30; + /** * Load label cleaner will run every *label_clean_interval_second* to clean the outdated jobs. */ diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java index 90a6e7a89ca61..4cae11aecbdf8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadsHistorySyncer.java @@ -15,13 +15,10 @@ package com.starrocks.load.loadv2; import com.starrocks.catalog.CatalogUtils; -import com.starrocks.catalog.OlapTable; import com.starrocks.common.Config; -import com.starrocks.common.UserException; -import com.starrocks.common.util.AutoInferUtil; import com.starrocks.common.util.FrontendDaemon; import com.starrocks.load.pipe.filelist.RepoExecutor; -import com.starrocks.server.GlobalStateMgr; +import com.starrocks.scheduler.history.TableKeeper; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +31,7 @@ public class LoadsHistorySyncer extends FrontendDaemon { public static final String LOADS_HISTORY_TABLE_NAME = "loads_history"; private static final String LOADS_HISTORY_TABLE_CREATE = - "CREATE TABLE IF NOT EXISTS %s (" + + String.format("CREATE TABLE IF NOT EXISTS %s (" + "id bigint, " + "label varchar(2048), " + "profile_id varchar(2048), " + @@ -64,10 +61,11 @@ public class LoadsHistorySyncer extends FrontendDaemon { ") " + "PARTITION BY date_trunc('DAY', load_finish_time) " + "DISTRIBUTED BY HASH(label) BUCKETS 3 " + - "properties('replication_num' = '%d') "; - - private static final String CORRECT_LOADS_HISTORY_REPLICATION_NUM = - "ALTER TABLE %s SET ('default.replication_num'='3')"; + "PROPERTIES( " + + "'replication_num' = '1', " + + "'partition_live_number' = '" + Config.loads_history_retained_days + "'" + + ")", + LOADS_HISTORY_TABLE_NAME); private static final String LOADS_HISTORY_SYNC = "INSERT INTO %s " + @@ -77,69 +75,26 @@ public class LoadsHistorySyncer extends FrontendDaemon { "AND load_finish_time > ( " + "SELECT COALESCE(MAX(load_finish_time), '0001-01-01 00:00:00') " + "FROM %s);"; - - private boolean databaseExists = false; - private boolean tableExists = false; - private boolean tableCorrected = false; - public LoadsHistorySyncer() { - super("Load history syncer", Config.loads_history_sync_interval_second * 1000L); - } + private boolean firstSync = true; - public boolean checkDatabaseExists() { - return GlobalStateMgr.getCurrentState().getDb(LOADS_HISTORY_DB_NAME) != null; - } + private static final TableKeeper KEEPER = + new TableKeeper(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME, LOADS_HISTORY_TABLE_CREATE, 3, + () -> Math.max(1, Config.loads_history_retained_days)); - public static void createTable() throws UserException { - String sql = SQLBuilder.buildCreateTableSql(); - RepoExecutor.getInstance().executeDDL(sql); + public static TableKeeper createKeeper() { + return KEEPER; } - public static boolean correctTable() { - int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber(); - int replica = GlobalStateMgr.getCurrentState() - .mayGetDb(LOADS_HISTORY_DB_NAME) - .flatMap(db -> db.mayGetTable(LOADS_HISTORY_TABLE_NAME)) - .map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum()) - .orElse((short) 1); - if (numBackends < 3) { - LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends); - return false; - } - if (replica < 3) { - String sql = SQLBuilder.buildAlterTableSql(); - RepoExecutor.getInstance().executeDDL(sql); - } else { - LOG.info("table {} already has {} replicas, no need to alter replication_num", - LOADS_HISTORY_TABLE_NAME, replica); - } - return true; + public LoadsHistorySyncer() { + super("Load history syncer", Config.loads_history_sync_interval_second * 1000L); } - public void checkMeta() throws UserException { - if (!databaseExists) { - databaseExists = checkDatabaseExists(); - if (!databaseExists) { - LOG.warn("database not exists: " + LOADS_HISTORY_DB_NAME); - return; - } - } - if (!tableExists) { - createTable(); - LOG.info("table created: " + LOADS_HISTORY_TABLE_NAME); - tableExists = true; - } - if (!tableCorrected && correctTable()) { - LOG.info("table corrected: " + LOADS_HISTORY_TABLE_NAME); - tableCorrected = true; - } - + public void syncData() { if (getInterval() != Config.loads_history_sync_interval_second * 1000L) { setInterval(Config.loads_history_sync_interval_second * 1000L); } - } - public void syncData() { try { RepoExecutor.getInstance().executeDML(SQLBuilder.buildSyncSql()); } catch (Exception e) { @@ -150,7 +105,11 @@ public void syncData() { @Override protected void runAfterCatalogReady() { try { - checkMeta(); + // wait table keeper to create table + if (firstSync) { + firstSync = false; + return; + } syncData(); } catch (Throwable e) { LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e); @@ -161,18 +120,6 @@ protected void runAfterCatalogReady() { * Generate SQL for operations */ static class SQLBuilder { - - public static String buildCreateTableSql() throws UserException { - int replica = AutoInferUtil.calDefaultReplicationNum(); - return String.format(LOADS_HISTORY_TABLE_CREATE, - CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica); - } - - public static String buildAlterTableSql() { - return String.format(CORRECT_LOADS_HISTORY_REPLICATION_NUM, - CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME)); - } - public static String buildSyncSql() { return String.format(LOADS_HISTORY_SYNC, CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java index 2037bc32cb053..69286a4183091 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TableKeeper.java @@ -20,6 +20,7 @@ import com.starrocks.common.Config; import com.starrocks.common.UserException; import com.starrocks.common.util.FrontendDaemon; +import com.starrocks.load.loadv2.LoadsHistorySyncer; import com.starrocks.load.pipe.filelist.RepoExecutor; import com.starrocks.server.GlobalStateMgr; import org.apache.commons.lang3.StringUtils; @@ -228,6 +229,7 @@ public static class TableKeeperDaemon extends FrontendDaemon { super("TableKeeper", Config.table_keeper_interval_second * 1000L); keeperList.add(TaskRunHistoryTable.createKeeper()); + keeperList.add(LoadsHistorySyncer.createKeeper()); // TODO: add FileListPipeRepo // TODO: add statistic table }