Skip to content

Commit

Permalink
[BugFix] Fix loads syncer wrong replication num property (backport #5…
Browse files Browse the repository at this point in the history
…2220) (#52458)

Signed-off-by: meegoo <[email protected]>
Co-authored-by: meegoo <[email protected]>
  • Loading branch information
mergify[bot] and meegoo authored Oct 31, 2024
1 parent bb3813e commit 05b411e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 74 deletions.
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int loads_history_sync_interval_second = 60;

/**
* The default retention days of load history.
*/
@ConfField(mutable = true)
public static int loads_history_retained_days = 30;

/**
* Load label cleaner will run every *label_clean_interval_second* to clean the outdated jobs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
package com.starrocks.load.loadv2;

import com.starrocks.catalog.CatalogUtils;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.common.util.AutoInferUtil;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.scheduler.history.TableKeeper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -34,7 +31,7 @@ public class LoadsHistorySyncer extends FrontendDaemon {
public static final String LOADS_HISTORY_TABLE_NAME = "loads_history";

private static final String LOADS_HISTORY_TABLE_CREATE =
"CREATE TABLE IF NOT EXISTS %s (" +
String.format("CREATE TABLE IF NOT EXISTS %s (" +
"id bigint, " +
"label varchar(2048), " +
"profile_id varchar(2048), " +
Expand Down Expand Up @@ -64,10 +61,11 @@ public class LoadsHistorySyncer extends FrontendDaemon {
") " +
"PARTITION BY date_trunc('DAY', load_finish_time) " +
"DISTRIBUTED BY HASH(label) BUCKETS 3 " +
"properties('replication_num' = '%d') ";

private static final String CORRECT_LOADS_HISTORY_REPLICATION_NUM =
"ALTER TABLE %s SET ('default.replication_num'='3')";
"PROPERTIES( " +
"'replication_num' = '1', " +
"'partition_live_number' = '" + Config.loads_history_retained_days + "'" +
")",
LOADS_HISTORY_TABLE_NAME);

private static final String LOADS_HISTORY_SYNC =
"INSERT INTO %s " +
Expand All @@ -77,69 +75,26 @@ public class LoadsHistorySyncer extends FrontendDaemon {
"AND load_finish_time > ( " +
"SELECT COALESCE(MAX(load_finish_time), '0001-01-01 00:00:00') " +
"FROM %s);";

private boolean databaseExists = false;
private boolean tableExists = false;
private boolean tableCorrected = false;

public LoadsHistorySyncer() {
super("Load history syncer", Config.loads_history_sync_interval_second * 1000L);
}
private boolean firstSync = true;

public boolean checkDatabaseExists() {
return GlobalStateMgr.getCurrentState().getDb(LOADS_HISTORY_DB_NAME) != null;
}
private static final TableKeeper KEEPER =
new TableKeeper(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME, LOADS_HISTORY_TABLE_CREATE, 3,
() -> Math.max(1, Config.loads_history_retained_days));

public static void createTable() throws UserException {
String sql = SQLBuilder.buildCreateTableSql();
RepoExecutor.getInstance().executeDDL(sql);
public static TableKeeper createKeeper() {
return KEEPER;
}

public static boolean correctTable() {
int numBackends = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getTotalBackendNumber();
int replica = GlobalStateMgr.getCurrentState()
.mayGetDb(LOADS_HISTORY_DB_NAME)
.flatMap(db -> db.mayGetTable(LOADS_HISTORY_TABLE_NAME))
.map(tbl -> ((OlapTable) tbl).getPartitionInfo().getMinReplicationNum())
.orElse((short) 1);
if (numBackends < 3) {
LOG.info("not enough backends in the cluster, expected 3 but got {}", numBackends);
return false;
}
if (replica < 3) {
String sql = SQLBuilder.buildAlterTableSql();
RepoExecutor.getInstance().executeDDL(sql);
} else {
LOG.info("table {} already has {} replicas, no need to alter replication_num",
LOADS_HISTORY_TABLE_NAME, replica);
}
return true;
public LoadsHistorySyncer() {
super("Load history syncer", Config.loads_history_sync_interval_second * 1000L);
}

public void checkMeta() throws UserException {
if (!databaseExists) {
databaseExists = checkDatabaseExists();
if (!databaseExists) {
LOG.warn("database not exists: " + LOADS_HISTORY_DB_NAME);
return;
}
}
if (!tableExists) {
createTable();
LOG.info("table created: " + LOADS_HISTORY_TABLE_NAME);
tableExists = true;
}
if (!tableCorrected && correctTable()) {
LOG.info("table corrected: " + LOADS_HISTORY_TABLE_NAME);
tableCorrected = true;
}

public void syncData() {
if (getInterval() != Config.loads_history_sync_interval_second * 1000L) {
setInterval(Config.loads_history_sync_interval_second * 1000L);
}
}

public void syncData() {
try {
RepoExecutor.getInstance().executeDML(SQLBuilder.buildSyncSql());
} catch (Exception e) {
Expand All @@ -150,7 +105,11 @@ public void syncData() {
@Override
protected void runAfterCatalogReady() {
try {
checkMeta();
// wait table keeper to create table
if (firstSync) {
firstSync = false;
return;
}
syncData();
} catch (Throwable e) {
LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e);
Expand All @@ -161,18 +120,6 @@ protected void runAfterCatalogReady() {
* Generate SQL for operations
*/
static class SQLBuilder {

public static String buildCreateTableSql() throws UserException {
int replica = AutoInferUtil.calDefaultReplicationNum();
return String.format(LOADS_HISTORY_TABLE_CREATE,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME), replica);
}

public static String buildAlterTableSql() {
return String.format(CORRECT_LOADS_HISTORY_REPLICATION_NUM,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME));
}

public static String buildSyncSql() {
return String.format(LOADS_HISTORY_SYNC,
CatalogUtils.normalizeTableName(LOADS_HISTORY_DB_NAME, LOADS_HISTORY_TABLE_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.load.loadv2.LoadsHistorySyncer;
import com.starrocks.load.pipe.filelist.RepoExecutor;
import com.starrocks.server.GlobalStateMgr;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -228,6 +229,7 @@ public static class TableKeeperDaemon extends FrontendDaemon {
super("TableKeeper", Config.table_keeper_interval_second * 1000L);

keeperList.add(TaskRunHistoryTable.createKeeper());
keeperList.add(LoadsHistorySyncer.createKeeper());
// TODO: add FileListPipeRepo
// TODO: add statistic table
}
Expand Down

0 comments on commit 05b411e

Please sign in to comment.