Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch committed Jan 7, 2025
1 parent af2d7c1 commit 9d7db2f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* CheckpointController daemon is running on master node. handle the checkpoint work for starrocks.
Expand All @@ -88,12 +88,16 @@ public class CheckpointController extends FrontendDaemon {
private static final int PUT_TIMEOUT_SECOND = 3600;
private static final int CONNECT_TIMEOUT_SECOND = 1;
private static final int READ_TIMEOUT_SECOND = 1;
private static final AtomicInteger EXCLUSIVE_RUNNING = new AtomicInteger(0);
private static final AtomicInteger CONCURRENT_RUNNING = new AtomicInteger(0);

protected String imageDir;
// subDir comes after base imageDir, to distinguish different module's image dir
protected String subDir;
protected boolean belongToGlobalStateMgr;
protected volatile long journalId;
// indicate that this CheckpointController should be executed in a exclusive way among all possible controllers
protected boolean exclusiveExecution;

private final Journal journal;

Expand All @@ -102,7 +106,6 @@ public class CheckpointController extends FrontendDaemon {
private volatile String workerNodeName;
private volatile long workerSelectedTime;
private volatile BlockingQueue<Pair<Boolean, String>> result;
private AtomicBoolean checkpointControllerRunning = new AtomicBoolean(false);

public CheckpointController(String name, Journal journal, String subDir) {
this(name, journal, subDir, FeConstants.checkpoint_interval_second);
Expand All @@ -114,32 +117,89 @@ public CheckpointController(String name, Journal journal, String subDir, long in
this.subDir = subDir;
this.belongToGlobalStateMgr = Strings.isNullOrEmpty(subDir);
nodesToPushImage = new HashSet<>();
this.exclusiveExecution = false;
}

private static boolean concurrentExecutionExistedOrComing() {
Preconditions.checkState(CONCURRENT_RUNNING.get() >= 0);
return CONCURRENT_RUNNING.get() > 0;
}

private static boolean exclusiveExecutionExistedOrComing() {
Preconditions.checkState(EXCLUSIVE_RUNNING.get() >= 0);
return EXCLUSIVE_RUNNING.get() > 0;
}

private static boolean feasibleExclusiveRunningState() {
return EXCLUSIVE_RUNNING.get() == 1;
}

private static void addExclusiveExecution() {
EXCLUSIVE_RUNNING.incrementAndGet();
}

private static void addConcurrentExecution() {
CONCURRENT_RUNNING.incrementAndGet();
}

private static boolean checkAndBeginRunning(boolean exclusiveExecution) {
String curRole = exclusiveExecution ? "exclusive checkpoint controller" : "concurrentable checkpoint controller";
String peerRole = exclusiveExecution ? "concurrentable checkpoint controller" : "exclusive checkpoint controller";
String errMsg = "Exit " + curRole + " because of the " + peerRole + " is running";

if (exclusiveExecution && !exclusiveExecutionExistedOrComing() && !concurrentExecutionExistedOrComing()) {
addExclusiveExecution();
} else if (!exclusiveExecution && !exclusiveExecutionExistedOrComing()) {
addConcurrentExecution();
} else {
LOG.info(errMsg);
return false;
}

if ((exclusiveExecution && (!feasibleExclusiveRunningState() || concurrentExecutionExistedOrComing())) ||
(!exclusiveExecution && exclusiveExecutionExistedOrComing())) {
finishRunning(exclusiveExecution);
LOG.info(errMsg);
return false;
}

return true;
}

private static void finishRunning(boolean exclusiveExecution) {
if (exclusiveExecution) {
EXCLUSIVE_RUNNING.decrementAndGet();
} else {
CONCURRENT_RUNNING.decrementAndGet();
}
Preconditions.checkState(EXCLUSIVE_RUNNING.get() >= 0);
Preconditions.checkState(CONCURRENT_RUNNING.get() >= 0);
}

public static boolean normalCheckpointRunning() {
return GlobalStateMgr.getServingState().getCheckpointController().isCheckpointControllerRunning() ||
StarMgrServer.getServingState().getCheckpointController().isCheckpointControllerRunning();
return concurrentExecutionExistedOrComing();
}

public static boolean clusterSnapshotCheckpointRunning() {
return GlobalStateMgr.getServingState().getClusterSnapshotCheckpointController().isCheckpointControllerRunning();
return feasibleExclusiveRunningState();
}

@Override
protected void runAfterCatalogReady() {
if (!checkAndBeginRunning()) {
if (!checkAndBeginRunning(exclusiveExecution)) {
return;
}

try {
Preconditions.checkState(exclusiveExecution && feasibleExclusiveRunningState() ||
!exclusiveExecution && concurrentExecutionExistedOrComing());
runCheckpointController();
} finally {
finishRunning();
finishRunning(exclusiveExecution);
}
}

protected void runCheckpointController() {
Preconditions.checkState(!clusterSnapshotCheckpointRunning());
init();

long imageJournalId = 0;
Expand Down Expand Up @@ -191,68 +251,6 @@ protected void runCheckpointController() {
}
}

protected boolean checkAndBeginRunning() {
Preconditions.checkState(!selfIsRunning());
String curRole = (this instanceof ClusterSnapshotCheckpointController) ? "cluster snapshot checkpoint" :
"normal checkpoint";
String peerRole = (this instanceof ClusterSnapshotCheckpointController) ? "normal checkpoint" :
"cluster snapshot checkpoint";
String errMsg = "Exit " + curRole + " because of the " + peerRole + " is running";

if (peerIsRunning()) {
LOG.info(errMsg);
return false;
}

changeRunningState(true);

if (peerIsRunning()) {
changeRunningState(false);
LOG.info(errMsg);
return false;
}

return true;
}

protected void finishRunning() {
changeRunningState(false);
}

private boolean selfIsRunning() {
if (this instanceof ClusterSnapshotCheckpointController) {
return clusterSnapshotCheckpointRunning();
} else {
return normalCheckpointRunning();
}
}

private boolean peerIsRunning() {
if (this instanceof ClusterSnapshotCheckpointController) {
return normalCheckpointRunning();
} else {
return clusterSnapshotCheckpointRunning();
}
}

private void setCheckpointControllerRunning(boolean state) {
checkpointControllerRunning.set(state);
}

private boolean isCheckpointControllerRunning() {
return checkpointControllerRunning.get();
}

private void changeRunningState(boolean state) {
if (this instanceof ClusterSnapshotCheckpointController) {
GlobalStateMgr.getServingState().getClusterSnapshotCheckpointController().setCheckpointControllerRunning(state);
} else if (belongToGlobalStateMgr) {
GlobalStateMgr.getServingState().getCheckpointController().setCheckpointControllerRunning(state);
} else {
StarMgrServer.getServingState().getCheckpointController().setCheckpointControllerRunning(state);
}
}

private void init() {
this.imageDir = GlobalStateMgr.getServingState().getImageDir() + subDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.starrocks.leader;

import com.google.common.base.Preconditions;
import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.common.StarRocksException;
Expand All @@ -37,6 +36,7 @@ public ClusterSnapshotCheckpointController(Journal feJournal, Journal starMgrJou
super("cluster_snapshot_checkpoint_controller", null, "", Config.automated_cluster_snapshot_interval_seconds);
this.feJournal = feJournal;
this.starMgrJournal = starMgrJournal;
this.exclusiveExecution = true;
}

@Override
Expand All @@ -45,20 +45,11 @@ protected void runAfterCatalogReady() {
return;
}

if (!checkAndBeginRunning()) {
return;
}

try {
runCheckpointController();
} finally {
finishRunning();
}
super.runAfterCatalogReady();
}

@Override
protected void runCheckpointController() {
Preconditions.checkState(!normalCheckpointRunning());
String errMsg = "";
ClusterSnapshotJob job = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.createAutomatedSnapshotJob(); /* INITIALIZING state */
Expand Down

0 comments on commit 9d7db2f

Please sign in to comment.