Skip to content

Commit

Permalink
lake compaction scheduler optimize in fe restart secinarios
Browse files Browse the repository at this point in the history
Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life committed Jan 9, 2025
1 parent 9627535 commit 4a717a5
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.memory.MemoryTrackable;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
Expand Down Expand Up @@ -80,16 +85,58 @@ public void setCompactionScheduler(CompactionScheduler compactionScheduler) {

public void start() {
if (compactionScheduler == null) {
compactionScheduler = new CompactionScheduler(this, GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(),
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr(), GlobalStateMgr.getCurrentState(),
Config.lake_compaction_disable_tables);
compactionScheduler =
new CompactionScheduler(this, GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(),
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr(),
GlobalStateMgr.getCurrentState(),
Config.lake_compaction_disable_tables);
GlobalStateMgr.getCurrentState().getConfigRefreshDaemon().registerListener(() -> {
compactionScheduler.disableTables(Config.lake_compaction_disable_tables);
});
compactionScheduler.start();
}
}

/**
* iterate all transactions and find those with LAKE_COMPACTION labels and are not finished yet.
**/
protected Map<PartitionIdentifier, CompactionJob> rebuildRunningCompactionsOnRestart() {
Map<PartitionIdentifier, CompactionJob> runningCompactions = new ConcurrentHashMap<>();
List<Long> activeTxnIds =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnIds();
for (PartitionStatistics statistics : partitionStatisticsHashMap.values()) {
PartitionIdentifier partitionIdentifier = statistics.getPartition();
long compactionTxnId = statistics.getLastCompactionTxnId();
if (!activeTxnIds.contains(compactionTxnId)) {
continue;
}
Database db = GlobalStateMgr.getCurrentState().getDb(partitionIdentifier.getDbId());
if (db == null) {
continue;
}
Locker locker = new Locker();
locker.lockDatabase(db, LockType.READ);
try {
OlapTable table = (OlapTable) db.getTable(partitionIdentifier.getTableId());
if (table == null) {
continue;
}
PhysicalPartition partition = table.getPhysicalPartition(partitionIdentifier.getPartitionId());
if (partition == null) {
continue;
}
CompactionJob job = new CompactionJob(db, table, partition, compactionTxnId,
Config.lake_compaction_allow_partial_success);
runningCompactions.put(partitionIdentifier, job);
} catch (Exception e) {
LOG.error("Rebuild running compactions on fe restart failed: {}", e.getMessage(), e);
} finally {
locker.unLockDatabase(db, LockType.READ);
}
}
return runningCompactions;
}

public void handleLoadingFinished(PartitionIdentifier partition, long version, long versionTime,
Quantiles compactionScore) {
PartitionVersion currentVersion = new PartitionVersion(version, versionTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public class CompactionScheduler extends Daemon {
private final GlobalStateMgr stateMgr;
private final ConcurrentHashMap<PartitionIdentifier, CompactionJob> runningCompactions;
private final SynchronizedCircularQueue<CompactionRecord> history;
private boolean finishedWaiting = false;
private long waitTxnId = -1;
private long lastPartitionCleanTime;
private Set<Long> disabledTables; // copy-on-write

Expand All @@ -98,38 +96,26 @@ public class CompactionScheduler extends Daemon {
this.disabledTables = Collections.unmodifiableSet(new HashSet<>());

disableTables(disableTablesStr);

// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
// As `runningCompactions` is not persisted in journal, we should rebuild running compactions on restart
this.runningCompactions.putAll(compactionManager.rebuildRunningCompactionsOnRestart());
}

@Override
protected void runOneCycle() {
cleanPartition();

// Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay.
// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) {
if (stateMgr.isLeader() && stateMgr.isReady()) {
schedule();
history.changeMaxSize(Config.lake_compaction_history_size);
}
}

// Returns true if all compaction transactions committed before this restart have finished(i.e., of VISIBLE state).
private boolean allCommittedCompactionsBeforeRestartHaveFinished() {
if (finishedWaiting) {
return true;
}
// Note: must call getMinActiveCompactionTxnId() before getNextTransactionId(), otherwise if there are
// no running transactions waitTxnId <= minActiveTxnId will always be false.
long minActiveTxnId = transactionMgr.getMinActiveCompactionTxnId();
if (waitTxnId < 0) {
waitTxnId = transactionMgr.getTransactionIDGenerator().getNextTransactionId();
}
finishedWaiting = waitTxnId <= minActiveTxnId;
return finishedWaiting;
}

private void schedule() {
// Check whether there are completed compaction jobs.
for (Iterator<Map.Entry<PartitionIdentifier, CompactionJob>> iterator = runningCompactions.entrySet().iterator();
Expand All @@ -144,6 +130,9 @@ private void schedule() {
}

CompactionJob job = entry.getValue();
if (statistics != null) {
statistics.setLastCompactionTxnId(job.getTxnId());
}
if (!job.transactionHasCommitted()) {
String errorMsg = null;

Expand Down Expand Up @@ -396,6 +385,7 @@ protected long beginTransaction(PartitionIdentifier partition)
TransactionState.TxnCoordinator coordinator = new TransactionState.TxnCoordinator(txnSourceType, HOST_NAME);
String label = String.format("COMPACTION_%d-%d-%d-%d", dbId, tableId, partitionId, currentTs);


WarehouseManager manager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
Warehouse warehouse = manager.getCompactionWarehouse();
return transactionMgr.beginTransaction(dbId, Lists.newArrayList(tableId), label, coordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class PartitionStatistics {
private volatile CompactionPriority priority = CompactionPriority.DEFAULT;
// not persist on purpose, used to control the interval of continuous partial success compaction
private int punishFactor = 1;
// record the last compaction txn id, used to check whether the last compaction is finished
// should persist on purpose
@SerializedName(value = "lastCompactionTxnId")
private long lastCompactionTxnId;

public enum CompactionPriority {
DEFAULT(0),
Expand Down Expand Up @@ -147,6 +151,14 @@ public PartitionStatisticsSnapshot getSnapshot() {
return new PartitionStatisticsSnapshot(this);
}

public long getLastCompactionTxnId() {
return lastCompactionTxnId;
}

public void setLastCompactionTxnId(long lastCompactionTxnId) {
this.lastCompactionTxnId = lastCompactionTxnId;
}

@Override
public String toString() {
return new Gson().toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,22 @@ public List<TransactionState> getCommittedTxnList() {
}
}

public List<TransactionState> getLakeCompactionCommittedTxnList() {
readLock();
try {
// only send task to committed transaction
return idToRunningTransactionState.values().stream()
.filter(transactionState -> (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED))
.filter(transactionState -> transactionState.getSourceType() ==
TransactionState.LoadJobSourceType.LAKE_COMPACTION)
.sorted(Comparator.comparing(TransactionState::getCommitTime))
.collect(Collectors.toList());
} finally {
readUnlock();
}
}

// Check whether there is committed txns on partitionId.
public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) {
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,20 @@ public long getMinActiveCompactionTxnId() {
return minId;
}

/**
* Get the list of active txn ids of compaction transactions.
* @return the list of active txn ids of compaction transactions.
*/
public List<Long> getLakeCompactionActiveTxnIds() {
List<Long> txnList = new ArrayList<>();
for (Map.Entry<Long, DatabaseTransactionMgr> entry : dbIdToDatabaseTransactionMgrs.entrySet()) {
DatabaseTransactionMgr dbTransactionMgr = entry.getValue();
dbTransactionMgr.getLakeCompactionCommittedTxnList()
.forEach(transactionState -> txnList.add(transactionState.getTransactionId()));
}
return txnList;
}

/**
* Get the smallest transaction ID of active transactions in a database.
* If there are no active transactions in the database, return the transaction ID that will be assigned to the
Expand Down

0 comments on commit 4a717a5

Please sign in to comment.