From c2f5cd42b9f54e74333fa428485d2f2c19cdea5f Mon Sep 17 00:00:00 2001 From: srlch Date: Tue, 7 Jan 2025 16:18:20 +0800 Subject: [PATCH 1/6] [Feature] Support Cluster Snapshot Backup: checkpoint and image backup (part3) Signed-off-by: srlch --- .../schema_cluster_snapshot_jobs_scanner.cpp | 16 +- .../schema_cluster_snapshots_scanner.cpp | 16 +- .../information/ClusterSnapshotJobsTable.java | 4 +- .../information/ClusterSnapshotsTable.java | 4 +- .../java/com/starrocks/common/Config.java | 6 + .../main/java/com/starrocks/fs/HdfsUtil.java | 5 + .../com/starrocks/fs/hdfs/HdfsFsManager.java | 18 ++ .../com/starrocks/fs/hdfs/HdfsService.java | 5 + .../starrocks/journal/CheckpointWorker.java | 6 + .../java/com/starrocks/lake/StarOSAgent.java | 4 + .../lake/snapshot/ClusterSnapshot.java | 88 +++++++++ .../lake/snapshot/ClusterSnapshotJob.java | 172 +++++++++++++++++ .../lake/snapshot/ClusterSnapshotMgr.java | 152 ++++++++++++++- .../lake/snapshot/ClusterSnapshotUtils.java | 53 +++++ .../leader/CheckpointController.java | 113 +++++++++-- .../ClusterSnapshotCheckpointController.java | 182 ++++++++++++++++++ .../starrocks/persist/ClusterSnapshotLog.java | 28 ++- .../com/starrocks/server/GlobalStateMgr.java | 21 +- .../java/com/starrocks/server/NodeMgr.java | 2 + .../service/FrontendServiceImpl.java | 8 +- .../sql/analyzer/ClusterSnapshotAnalyzer.java | 9 + .../storagevolume/StorageVolume.java | 4 + .../java/com/starrocks/system/Frontend.java | 3 + .../starrocks/lake/ClusterSnapshotTest.java | 28 ++- 24 files changed, 914 insertions(+), 33 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotUtils.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java diff --git a/be/src/exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp b/be/src/exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp index fa71c0fbcb2c2..613460e1d3692 100644 --- a/be/src/exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp @@ -25,8 +25,8 @@ namespace starrocks { SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = { {"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, {"JOB_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, - {"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, - {"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, + {"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true}, + {"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true}, {"STATE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, {"DETAIL_INFO", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, {"ERROR_MESSAGE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, @@ -52,8 +52,16 @@ Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) { auto& slot_id_map = (*chunk)->get_slot_id_to_index_map(); const TClusterSnapshotJobsItem& info = _result.items[_index]; DatumArray datum_array{ - Slice(info.snapshot_name), info.job_id, info.created_time, - info.finished_time, Slice(info.state), Slice(info.detail_info), + Slice(info.snapshot_name), + info.job_id, + + TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()), + + TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()), + + Slice(info.state), + Slice(info.detail_info), + Slice(info.error_message), }; for (const auto& [slot_id, index] : slot_id_map) { diff --git a/be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp b/be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp index 4e77498ca7c3d..68d81c175697c 100644 --- a/be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp @@ -25,8 +25,8 @@ namespace starrocks { SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = { {"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, {"SNAPSHOT_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, - {"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, - {"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, + {"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true}, + {"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true}, {"FE_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, {"STARMGR_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, {"PROPERTIES", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, @@ -54,9 +54,17 @@ Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) { auto& slot_id_map = (*chunk)->get_slot_id_to_index_map(); const TClusterSnapshotsItem& info = _result.items[_index]; DatumArray datum_array{ - Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time, + Slice(info.snapshot_name), + Slice(info.snapshot_type), - info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume), + TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()), + + TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()), + + info.fe_jouranl_id, + info.starmgr_jouranl_id, + Slice(info.properties), + Slice(info.storage_volume), Slice(info.storage_path), }; diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotJobsTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotJobsTable.java index 3c9f91d2c5f7e..e509cf81d6729 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotJobsTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotJobsTable.java @@ -33,8 +33,8 @@ public static SystemTable create() { builder() .column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) .column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT)) - .column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT)) - .column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT)) + .column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME)) .column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN)) .column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN)) .column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN)) diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotsTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotsTable.java index 866035d873f35..bbb6e22b44fc3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotsTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/system/information/ClusterSnapshotsTable.java @@ -33,8 +33,8 @@ public static SystemTable create() { builder() .column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) .column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN)) - .column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT)) - .column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT)) + .column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME)) .column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN)) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 0bd273d6cfd14..a14d4eb9366cd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -3390,4 +3390,10 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static int query_deploy_threadpool_size = max(50, getRuntime().availableProcessors() * 10); + + @ConfField(mutable = true) + public static long automated_cluster_snapshot_interval_seconds = 1800; + + @ConfField(mutable = false) + public static int max_historical_automated_cluster_snapshot_jobs = 100; } diff --git a/fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java b/fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java index 1255900bbae32..502a9564ca08a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java +++ b/fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java @@ -71,6 +71,11 @@ public static void copyToLocal(String srcPath, String destPath, Map properties) + throws StarRocksException { + hdfsService.copyFromLocal(srcPath, destPath, properties); + } + /** * Parse file status in path with broker, except directory * diff --git a/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java b/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java index c315d04a4844f..b12841e797d6e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.UserGroupInformation; @@ -47,6 +48,7 @@ import software.amazon.awssdk.awscore.util.AwsHostNameUtils; import software.amazon.awssdk.regions.Region; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -1212,6 +1214,22 @@ public void copyToLocal(String srcPath, String destPath, Map pro } } + public void copyFromLocal(String srcPath, String destPath, Map properties) throws StarRocksException { + HdfsFs fileSystem = getFileSystem(destPath, properties, null); + try { + WildcardURI destPathUri = new WildcardURI(destPath); + File srcFile = new File(srcPath); + FileUtil.copy(srcFile, fileSystem.getDFSFileSystem(), new Path(destPathUri.getPath()), false, new Configuration()); + } catch (InterruptedIOException e) { + Thread.interrupted(); // clear interrupted flag + LOG.error("Interrupted while copy local {} to {} ", srcPath, destPath, e); + throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e); + } catch (Exception e) { + LOG.error("Exception while copy local {} to {} ", srcPath, destPath, e); + throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e); + } + } + public List listFileMeta(String path, Map properties) throws StarRocksException { WildcardURI pathUri = new WildcardURI(path); HdfsFs fileSystem = getFileSystem(path, properties, null); diff --git a/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsService.java b/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsService.java index 35ae49507fcfe..4dcc0ab826c6c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsService.java +++ b/fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsService.java @@ -59,6 +59,11 @@ public void copyToLocal(String srcPath, String destPath, Map pro LOG.info("Copied {} to local {}", srcPath, destPath); } + public void copyFromLocal(String srcPath, String destPath, Map properties) throws StarRocksException { + fileSystemManager.copyFromLocal(srcPath, destPath, properties); + LOG.info("Copied local {} to {}", srcPath, destPath); + } + public void listPath(TBrokerListPathRequest request, List fileStatuses, boolean skipDir, boolean fileNameOnly) throws StarRocksException { LOG.info("receive a list path request, path: {}", request.path); diff --git a/fe/fe-core/src/main/java/com/starrocks/journal/CheckpointWorker.java b/fe/fe-core/src/main/java/com/starrocks/journal/CheckpointWorker.java index d91e7b9e239ed..43b8d25225ac6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/journal/CheckpointWorker.java +++ b/fe/fe-core/src/main/java/com/starrocks/journal/CheckpointWorker.java @@ -80,9 +80,11 @@ protected void runAfterCatalogReady() { if (nextPoint == null) { return; } + if (nextPoint.journalId <= getImageJournalId()) { return; } + if (nextPoint.epoch != servingGlobalState.getEpoch()) { return; } @@ -133,6 +135,10 @@ private void finishCheckpoint(long epoch, long journalId, boolean isSuccess, Str String nodeName = servingGlobalState.getNodeMgr().getNodeName(); if (servingGlobalState.isLeader()) { CheckpointController controller = getCheckpointController(); + if (CheckpointController.clusterSnapshotCheckpointRunning()) { + controller = GlobalStateMgr.getServingState().getClusterSnapshotCheckpointController(); + } + if (isSuccess) { try { controller.finishCheckpoint(journalId, nodeName); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java b/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java index 37c8170c12470..a2defbebcb08f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java @@ -140,6 +140,10 @@ public void getServiceId() { LOG.info("get serviceId {} from starMgr", serviceId); } + public String getRawServiceId() { + return serviceId; + } + public String addFileStore(FileStoreInfo fsInfo) throws DdlException { try { return client.addFileStore(fsInfo, serviceId); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java new file mode 100644 index 0000000000000..8ab59b21ecf04 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java @@ -0,0 +1,88 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.google.gson.annotations.SerializedName; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.StorageVolumeMgr; +import com.starrocks.storagevolume.StorageVolume; +import com.starrocks.thrift.TClusterSnapshotsItem; + +public class ClusterSnapshot { + public enum ClusterSnapshotType { AUTOMATED } + + @SerializedName(value = "snapshotId") + private long snapshotId; + @SerializedName(value = "snapshotName") + private String snapshotName; + @SerializedName(value = "type") + private ClusterSnapshotType type; + @SerializedName(value = "storageVolumeName") + private String storageVolumeName; + @SerializedName(value = "createdTime") + private long createdTime; + @SerializedName(value = "finishedTime") + private long finishedTime; + @SerializedName(value = "feJournalId") + private long feJournalId; + @SerializedName(value = "starMgrJournal") + private long starMgrJournalId; + + public ClusterSnapshot() {} + + public ClusterSnapshot(long snapshotId, String snapshotName, String storageVolumeName, long createdTime, + long finishedTime, long feJournalId, long starMgrJournalId) { + this.snapshotId = snapshotId; + this.snapshotName = snapshotName; + this.type = ClusterSnapshotType.AUTOMATED; + this.storageVolumeName = storageVolumeName; + this.createdTime = createdTime; + this.finishedTime = finishedTime; + this.feJournalId = feJournalId; + this.starMgrJournalId = starMgrJournalId; + } + + public String getSnapshotName() { + return snapshotName; + } + + public long getFinishedTime() { + return finishedTime; + } + + public TClusterSnapshotsItem getInfo() { + TClusterSnapshotsItem item = new TClusterSnapshotsItem(); + item.setSnapshot_name(snapshotName); + item.setSnapshot_type(type.name()); + item.setCreated_time(createdTime); + item.setFinished_time(finishedTime); + item.setFe_jouranl_id(feJournalId); + item.setStarmgr_jouranl_id(starMgrJournalId); + item.setProperties(""); + item.setStorage_volume(storageVolumeName); + + StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr(); + try { + StorageVolume sv = storageVolumeMgr.getStorageVolumeByName(storageVolumeName); + if (sv == null) { + throw new Exception("Unknown storage volume: " + storageVolumeName); + } + item.setStorage_path(ClusterSnapshotUtils.getSnapshotImagePath(sv, snapshotName)); + } catch (Exception e) { + item.setStorage_path(""); + } + return item; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java new file mode 100644 index 0000000000000..ab5a8918baf24 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java @@ -0,0 +1,172 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.google.gson.annotations.SerializedName; +import com.starrocks.common.io.Text; +import com.starrocks.common.io.Writable; +import com.starrocks.persist.ClusterSnapshotLog; +import com.starrocks.persist.gson.GsonUtils; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.thrift.TClusterSnapshotJobsItem; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.time.Instant; + +public class ClusterSnapshotJob implements Writable { + /* + * INITIALIZING: INIT state for the snapshot. + * SNAPSHOTING: Doing checkpoint/image generation by replaying log for image both for FE and StarMgr and + * then upload the image into remote storage + * UPLOADING: Uploading image file into remote storage + * FINISHED: Finish backup snapshot + */ + public enum ClusterSnapshotJobState { INITIALIZING, SNAPSHOTING, UPLOADING, FINISHED, ERROR } + + @SerializedName(value = "jobId") + private long jobId; + @SerializedName(value = "snapshotNamePrefix") + private String snapshotNamePrefix; + @SerializedName(value = "snapshotName") + private String snapshotName; + @SerializedName(value = "storageVolumeName") + private String storageVolumeName; + @SerializedName(value = "createdTime") + private long createdTime; + @SerializedName(value = "finishedTime") + private long finishedTime; + @SerializedName(value = "feJournalId") + private long feJournalId; + @SerializedName(value = "starMgrJournalId") + private long starMgrJournalId; + @SerializedName(value = "state") + private ClusterSnapshotJobState state; + @SerializedName(value = "errMsg") + private String errMsg; + + public ClusterSnapshotJob(long jobId, String snapshotNamePrefix, String snapshotName, String storageVolumeName, + long createdTime) { + this.jobId = jobId; + this.snapshotNamePrefix = snapshotNamePrefix; + this.snapshotName = snapshotName; + this.storageVolumeName = storageVolumeName; + this.createdTime = createdTime; + this.finishedTime = -1; + this.feJournalId = 0; + this.starMgrJournalId = 0; + this.state = ClusterSnapshotJobState.INITIALIZING; + this.errMsg = ""; + } + + public void setState(ClusterSnapshotJobState state, boolean isReplay) { + this.state = state; + if (state == ClusterSnapshotJobState.FINISHED) { + this.finishedTime = Instant.now().getEpochSecond(); + } + + if (!isReplay) { + logJob(); + } + + if (state == ClusterSnapshotJobState.FINISHED) { + createSnapshotIfJobIsFinished(); + } + } + + public void setJournalIds(long feJournalId, long starMgrJournalId) { + this.feJournalId = feJournalId; + this.starMgrJournalId = starMgrJournalId; + } + + public void setErrMsg(String errMsg) { + this.errMsg = errMsg; + } + + public String getSnapshotNamePrefix() { + return snapshotNamePrefix; + } + + public String getSnapshotName() { + return snapshotName; + } + + public String getStorageVolumeName() { + return storageVolumeName; + } + + public long getCreatedTime() { + return createdTime; + } + + public long getFinishedTime() { + return finishedTime; + } + + public long getFeJournalId() { + return feJournalId; + } + + public long getStarMgrJournalId() { + return starMgrJournalId; + } + + public long getJobId() { + return jobId; + } + + public ClusterSnapshotJobState getState() { + return state; + } + + public boolean isUnFinishedState() { + return state == ClusterSnapshotJobState.SNAPSHOTING || + state == ClusterSnapshotJobState.UPLOADING || + state == ClusterSnapshotJobState.FINISHED; + } + + public void logJob() { + ClusterSnapshotLog log = new ClusterSnapshotLog(); + log.setSnapshotJob(this); + GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); + } + + private void createSnapshotIfJobIsFinished() { + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnaphot(this); + } + + public TClusterSnapshotJobsItem getInfo() { + TClusterSnapshotJobsItem item = new TClusterSnapshotJobsItem(); + item.setSnapshot_name(snapshotName); + item.setJob_id(jobId); + item.setCreated_time(createdTime); + item.setFinished_time(finishedTime); + item.setState(state.name()); + item.setDetail_info(""); + item.setError_message(errMsg); + return item; + } + + public static ClusterSnapshotJob read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ClusterSnapshotJob.class); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java index 058ac4ff43a3e..aa61ccce1ae2e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java @@ -15,6 +15,9 @@ package com.starrocks.lake.snapshot; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Config; +import com.starrocks.common.StarRocksException; +import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; import com.starrocks.persist.ClusterSnapshotLog; import com.starrocks.persist.ImageWriter; import com.starrocks.persist.gson.GsonPostProcessable; @@ -24,12 +27,19 @@ import com.starrocks.persist.metablock.SRMetaBlockReader; import com.starrocks.persist.metablock.SRMetaBlockWriter; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; +import com.starrocks.storagevolume.StorageVolume; +import com.starrocks.thrift.TClusterSnapshotJobsResponse; +import com.starrocks.thrift.TClusterSnapshotsResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.TreeMap; // only used for AUTOMATED snapshot for now public class ClusterSnapshotMgr implements GsonPostProcessable { @@ -38,6 +48,10 @@ public class ClusterSnapshotMgr implements GsonPostProcessable { @SerializedName(value = "automatedSnapshotSvName") private String automatedSnapshotSvName = ""; + @SerializedName(value = "automatedSnapshot") + private ClusterSnapshot automatedSnapshot = null; + @SerializedName(value = "historyAutomatedSnapshotJobs") + private TreeMap historyAutomatedSnapshotJobs = new TreeMap<>(); public ClusterSnapshotMgr() {} @@ -60,7 +74,7 @@ public String getAutomatedSnapshotSvName() { } public boolean isAutomatedSnapshotOn() { - return automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty(); + return RunMode.isSharedDataMode() && automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty(); } // Turn off automated snapshot, use stmt for extension in future @@ -70,11 +84,100 @@ public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) { ClusterSnapshotLog log = new ClusterSnapshotLog(); log.setDropSnapshot(AUTOMATED_NAME_PREFIX); GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); + + // avoid network communication when replay log + if (automatedSnapshot != null) { + try { + ClusterSnapshotUtils.clearAutomatedSnapshotFromRemote(automatedSnapshot.getSnapshotName()); + } catch (StarRocksException e) { + LOG.warn("Cluster Snapshot: {} delete failed, err msg: {}", automatedSnapshot.getSnapshotName(), e.getMessage()); + } + } } protected void setAutomatedSnapshotOff() { // drop AUTOMATED snapshot automatedSnapshotSvName = ""; + automatedSnapshot = null; + } + + public void createAutomatedSnaphot(ClusterSnapshotJob job) { + ClusterSnapshot newAutomatedSnapshot = new ClusterSnapshot( + GlobalStateMgr.getCurrentState().getNextId(), job.getSnapshotName(), job.getStorageVolumeName(), + job.getCreatedTime(), job.getFinishedTime(), job.getFeJournalId(), job.getStarMgrJournalId()); + + ClusterSnapshotLog log = new ClusterSnapshotLog(); + log.setCreateSnapshot(newAutomatedSnapshot); + GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); + + if (automatedSnapshot != null && automatedSnapshot.getSnapshotName().startsWith(AUTOMATED_NAME_PREFIX)) { + try { + ClusterSnapshotUtils.clearAutomatedSnapshotFromRemote(automatedSnapshot.getSnapshotName()); + } catch (StarRocksException e) { + LOG.warn("Cluster Snapshot: {} delete failed, err msg: {}", automatedSnapshot.getSnapshotName(), e.getMessage()); + } + } + + automatedSnapshot = newAutomatedSnapshot; + + LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", job.getJobId(), + job.getSnapshotName()); + } + + public ClusterSnapshotJob createAutomatedSnapshotJob() { + long createTime = Instant.now().getEpochSecond(); + long jobId = GlobalStateMgr.getCurrentState().getNextId(); + String snapshotNamePrefix = ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX; + String snapshotName = snapshotNamePrefix + '_' + String.valueOf(createTime); + String storageVolumeName = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSvName(); + ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotNamePrefix, snapshotName, storageVolumeName, createTime); + job.setState(ClusterSnapshotJobState.INITIALIZING, false); + + addJob(job); + + LOG.info("Create automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName); + + return job; + } + + public StorageVolume getAutomatedSnapshotSv() { + if (automatedSnapshotSvName.isEmpty()) { + return null; + } + + return GlobalStateMgr.getCurrentState().getStorageVolumeMgr().getStorageVolumeByName(automatedSnapshotSvName); + } + + public ClusterSnapshot getAutomatedSnapshot() { + return automatedSnapshot; + } + + public boolean containsAutomatedSnapshot() { + return getAutomatedSnapshot() != null; + } + + public synchronized void addJob(ClusterSnapshotJob job) { + if (Config.max_historical_automated_cluster_snapshot_jobs >= 1 && + historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) { + historyAutomatedSnapshotJobs.pollFirstEntry(); + } + historyAutomatedSnapshotJobs.put(job.getJobId(), job); + } + + public TClusterSnapshotJobsResponse getAllJobsInfo() { + TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse(); + for (Map.Entry entry : historyAutomatedSnapshotJobs.entrySet()) { + response.addToItems(entry.getValue().getInfo()); + } + return response; + } + + public TClusterSnapshotsResponse getAllInfo() { + TClusterSnapshotsResponse response = new TClusterSnapshotsResponse(); + if (automatedSnapshot != null) { + response.addToItems(automatedSnapshot.getInfo()); + } + return response; } public void replayLog(ClusterSnapshotLog log) { @@ -95,12 +198,59 @@ public void replayLog(ClusterSnapshotLog log) { } break; } + case CREATE_SNAPSHOT: { + ClusterSnapshot snapshot = log.getSnapshot(); + automatedSnapshot = snapshot; + break; + } + case UPDATE_SNAPSHOT_JOB: { + ClusterSnapshotJob job = log.getSnapshotJob(); + ClusterSnapshotJobState state = job.getState(); + + switch (state) { + case INITIALIZING: { + addJob(job); + break; + } + case SNAPSHOTING: + case UPLOADING: + case FINISHED: + case ERROR: { + if (historyAutomatedSnapshotJobs.containsKey(job.getJobId())) { + historyAutomatedSnapshotJobs.remove(job.getJobId()); + historyAutomatedSnapshotJobs.put(job.getJobId(), job); + } + break; + } + default: { + LOG.warn("Invalid Cluster Snapshot Job state {}", state); + } + } + + // if a job do not finished/error but fe restart, we should reset the state as error + // when replaying the log during FE restart. Because the job is unretryable after restart + if (!GlobalStateMgr.getServingState().isReady() && job.isUnFinishedState()) { + job.setState(ClusterSnapshotJobState.ERROR, true); + job.setErrMsg("Snapshot job has been failed"); + } + break; + } default: { LOG.warn("Invalid Cluster Snapshot Log Type {}", logType); } } } + public void resetLastUnFinishedAutomatedSnapshotJob() { + if (!historyAutomatedSnapshotJobs.isEmpty()) { + ClusterSnapshotJob job = historyAutomatedSnapshotJobs.lastEntry().getValue(); + if (job.isUnFinishedState()) { + job.setErrMsg("Snapshot job has been failed"); + job.setState(ClusterSnapshotJobState.ERROR, false); + } + } + } + public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException { SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, 1); writer.writeJson(this); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotUtils.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotUtils.java new file mode 100644 index 0000000000000..5a299883eb48b --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotUtils.java @@ -0,0 +1,53 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.starrocks.analysis.BrokerDesc; +import com.starrocks.common.StarRocksException; +import com.starrocks.fs.HdfsUtil; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.storagevolume.StorageVolume; + +public class ClusterSnapshotUtils { + public static String getSnapshotImagePath(StorageVolume sv, String snapshotName) { + return String.join("/", sv.getLocations().get(0), + GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId(), "meta/image", snapshotName); + } + + public static void uploadAutomatedSnapshotToRemote(String snapshotName) throws StarRocksException { + if (snapshotName == null || snapshotName.isEmpty()) { + return; + } + + StorageVolume sv = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSv(); + String snapshotImagePath = getSnapshotImagePath(sv, snapshotName); + String localImagePath = GlobalStateMgr.getServingState().getImageDir(); + + HdfsUtil.copyFromLocal(localImagePath, snapshotImagePath, sv.getProperties()); + } + + public static void clearAutomatedSnapshotFromRemote(String snapshotName) throws StarRocksException { + if (snapshotName == null || snapshotName.isEmpty()) { + return; + } + + StorageVolume sv = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSv(); + BrokerDesc brokerDesc = new BrokerDesc(sv.getProperties()); + String snapshotImagePath = String.join("/", sv.getLocations().get(0), + GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId(), "meta/image", snapshotName); + + HdfsUtil.deletePath(snapshotImagePath, brokerDesc); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java index 60f0d26f3d91b..bda4843cf4fd8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java @@ -34,6 +34,7 @@ package com.starrocks.leader; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.starrocks.common.Config; import com.starrocks.common.FeConstants; @@ -77,6 +78,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * CheckpointController daemon is running on master node. handle the checkpoint work for starrocks. @@ -86,30 +88,118 @@ public class CheckpointController extends FrontendDaemon { private static final int PUT_TIMEOUT_SECOND = 3600; private static final int CONNECT_TIMEOUT_SECOND = 1; private static final int READ_TIMEOUT_SECOND = 1; + private static final AtomicInteger EXCLUSIVE_RUNNING = new AtomicInteger(0); + private static final AtomicInteger CONCURRENT_RUNNING = new AtomicInteger(0); - private String imageDir; - private final Journal journal; + protected String imageDir; // subDir comes after base imageDir, to distinguish different module's image dir - private final String subDir; - private final boolean belongToGlobalStateMgr; + protected String subDir; + protected boolean belongToGlobalStateMgr; + protected volatile long journalId; + // indicate that this CheckpointController should be executed in a exclusive way among all possible controllers + protected boolean exclusiveExecution; + + private final Journal journal; private final Set nodesToPushImage; private volatile String workerNodeName; private volatile long workerSelectedTime; - private volatile long journalId; private volatile BlockingQueue> result; public CheckpointController(String name, Journal journal, String subDir) { - super(name, FeConstants.checkpoint_interval_second * 1000L); + this(name, journal, subDir, FeConstants.checkpoint_interval_second); + } + + public CheckpointController(String name, Journal journal, String subDir, long intervalSeconds) { + super(name, intervalSeconds * 1000L); this.journal = journal; this.subDir = subDir; this.belongToGlobalStateMgr = Strings.isNullOrEmpty(subDir); nodesToPushImage = new HashSet<>(); + this.exclusiveExecution = false; + } + + private static boolean concurrentExecutionExistedOrComing() { + Preconditions.checkState(CONCURRENT_RUNNING.get() >= 0); + return CONCURRENT_RUNNING.get() > 0; + } + + private static boolean exclusiveExecutionExistedOrComing() { + Preconditions.checkState(EXCLUSIVE_RUNNING.get() >= 0); + return EXCLUSIVE_RUNNING.get() > 0; + } + + private static boolean feasibleExclusiveRunningState() { + return EXCLUSIVE_RUNNING.get() == 1; + } + + private static void addExclusiveExecution() { + EXCLUSIVE_RUNNING.incrementAndGet(); + } + + private static void addConcurrentExecution() { + CONCURRENT_RUNNING.incrementAndGet(); + } + + private static boolean checkAndBeginRunning(boolean exclusiveExecution) { + String curRole = exclusiveExecution ? "exclusive checkpoint controller" : "concurrentable checkpoint controller"; + String peerRole = exclusiveExecution ? "concurrentable checkpoint controller" : "exclusive checkpoint controller"; + String errMsg = "Exit " + curRole + " because of the " + peerRole + " is running"; + + if (exclusiveExecution && !exclusiveExecutionExistedOrComing() && !concurrentExecutionExistedOrComing()) { + addExclusiveExecution(); + } else if (!exclusiveExecution && !exclusiveExecutionExistedOrComing()) { + addConcurrentExecution(); + } else { + LOG.info(errMsg); + return false; + } + + if ((exclusiveExecution && (!feasibleExclusiveRunningState() || concurrentExecutionExistedOrComing())) || + (!exclusiveExecution && exclusiveExecutionExistedOrComing())) { + finishRunning(exclusiveExecution); + LOG.info(errMsg); + return false; + } + + return true; + } + + private static void finishRunning(boolean exclusiveExecution) { + if (exclusiveExecution) { + EXCLUSIVE_RUNNING.decrementAndGet(); + } else { + CONCURRENT_RUNNING.decrementAndGet(); + } + Preconditions.checkState(EXCLUSIVE_RUNNING.get() >= 0); + Preconditions.checkState(CONCURRENT_RUNNING.get() >= 0); + } + + public static boolean normalCheckpointRunning() { + return concurrentExecutionExistedOrComing(); + } + + public static boolean clusterSnapshotCheckpointRunning() { + return feasibleExclusiveRunningState(); } @Override protected void runAfterCatalogReady() { + if (!checkAndBeginRunning(exclusiveExecution)) { + return; + } + + try { + Preconditions.checkState(exclusiveExecution && feasibleExclusiveRunningState() || + !exclusiveExecution && concurrentExecutionExistedOrComing()); + runCheckpointController(); + } finally { + finishRunning(exclusiveExecution); + } + } + + protected void runCheckpointController() { init(); long imageJournalId = 0; @@ -130,7 +220,7 @@ protected void runAfterCatalogReady() { Pair createImageRet = Pair.create(false, ""); if (imageJournalId < maxJournalId) { this.journalId = maxJournalId; - createImageRet = createImage(); + createImageRet = createImage(Config.checkpoint_only_on_leader); } if (createImageRet.first) { // Push the image file to all other nodes @@ -165,9 +255,9 @@ private void init() { this.imageDir = GlobalStateMgr.getServingState().getImageDir() + subDir; } - private Pair createImage() { + protected Pair createImage(boolean checkpointOnlyOnLeader) { result = new ArrayBlockingQueue<>(1); - workerNodeName = selectWorker(); + workerNodeName = selectWorker(checkpointOnlyOnLeader); if (workerNodeName == null) { LOG.warn("Failed to select worker to do checkpoint, journalId: {}", journalId); return Pair.create(false, workerNodeName); @@ -247,9 +337,9 @@ private void downloadImage(ImageFormatVersion imageFormatVersion, String imageDi cleaner.clean(); } - private String selectWorker() { + private String selectWorker(boolean checkpointOnlyOnLeader) { List workers; - if (Config.checkpoint_only_on_leader) { + if (checkpointOnlyOnLeader) { workers = new ArrayList<>(); } else { workers = GlobalStateMgr.getServingState().getNodeMgr().getOtherFrontends(); @@ -445,6 +535,7 @@ public void finishCheckpoint(long journalId, String nodeName) throws CheckpointE throw new CheckpointException(String.format("worker node name node match, current worker is: %s, param worker is: %s", workerNodeName, nodeName)); } + if (journalId != this.journalId) { throw new CheckpointException(String.format("journalId not match, current journalId is: %d, param journalId is: %d", this.journalId, journalId)); diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java new file mode 100644 index 0000000000000..0349787d6045c --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java @@ -0,0 +1,182 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.leader; + +import com.starrocks.common.Config; +import com.starrocks.common.Pair; +import com.starrocks.common.StarRocksException; +import com.starrocks.journal.Journal; +import com.starrocks.lake.snapshot.ClusterSnapshotJob; +import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; +import com.starrocks.lake.snapshot.ClusterSnapshotUtils; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.staros.StarMgrServer; + +// ClusterSnapshotCheckpointController daemon is running on master node. handle the cluster snapshot checkpoint +// work for starrocks. Mutually exclusive execution with ordinary checkpoint process +public class ClusterSnapshotCheckpointController extends CheckpointController { + private static int CAPTURE_ID_RETRY_TIME = 10; + + private final Journal feJournal; + private final Journal starMgrJournal; + + public ClusterSnapshotCheckpointController(Journal feJournal, Journal starMgrJournal) { + super("cluster_snapshot_checkpoint_controller", null, "", Config.automated_cluster_snapshot_interval_seconds); + this.feJournal = feJournal; + this.starMgrJournal = starMgrJournal; + this.exclusiveExecution = true; + } + + @Override + protected void runAfterCatalogReady() { + if (!GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().isAutomatedSnapshotOn()) { + return; + } + + super.runAfterCatalogReady(); + } + + @Override + protected void runCheckpointController() { + String errMsg = ""; + ClusterSnapshotJob job = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr() + .createAutomatedSnapshotJob(); /* INITIALIZING state */ + do { + // step 1: capture consistent journal id for checkpoint + Pair consistentIds = captureConsistentCheckpointIdBetweenFEAndStarMgr(); + if (consistentIds == null) { + errMsg = "failed to capture consistent journal id for checkpoint"; + break; + } + job.setJournalIds(consistentIds.first, consistentIds.second); + LOG.info("Successful capture consistent journal id, FE checkpoint journal Id: {}, StarMgr checkpoint journal Id: {}", + consistentIds.first, consistentIds.second); + + // step 2: create image for FE and StarMgr + + // for cluster snapshot checkpoint, image will be always created by leader node. + // because it can ensure that the request from the next normal checkpoint will has the largest journal id than + // version for any follower node. This is the same assumption as normal checkpoint and we don't need to change + // any code for normal controller and worker to handle the situation where the image version in follwer is larger. + job.setState(ClusterSnapshotJobState.SNAPSHOTING, false); + Pair createImageRet = Pair.create(false, ""); + + createImageRet = createImageByLeaderForFE(consistentIds.first); + if (!createImageRet.first) { + errMsg = "checkpoint failed for FE image, err msg: " + createImageRet.second; + break; + } + + createImageRet = createImageByLeaderForStarMgr(consistentIds.second); + if (!createImageRet.first) { + errMsg = "checkpoint failed for StarMgr image, err msg: " + createImageRet.second; + break; + } + LOG.info("Finished create image for cluster snapshot checkpoint"); + + // step 3: upload all finished image file + job.setState(ClusterSnapshotJobState.UPLOADING, false); + try { + ClusterSnapshotUtils.uploadAutomatedSnapshotToRemote(job.getSnapshotName()); + } catch (StarRocksException e) { + errMsg = "upload image failed, err msg: " + e.getMessage(); + break; + } + } while (false); + + if (!errMsg.isEmpty()) { + job.setErrMsg(errMsg); + job.setState(ClusterSnapshotJobState.ERROR, false); + LOG.warn("Cluster Snapshot checkpoint failed: " + errMsg); + } else { + job.setState(ClusterSnapshotJobState.FINISHED, false); + LOG.warn("Finish Cluster Snapshot checkpoint, FE checkpoint journal Id: {}, StarMgr checkpoint journal Id: {}", + job.getFeJournalId(), job.getStarMgrJournalId()); + } + + // ClusterSnapshotCheckpointController does not responsible for pushImgae and deleteOldJournals + } + + private Pair createImageByLeaderForFE(long journalId) { + setContextForFE(journalId); + return createImage(true); + } + + private Pair createImageByLeaderForStarMgr(long journalId) { + setContextForStarMgr(journalId); + return createImage(true); + } + + private void setContextForFE(long journalId) { + setContext(true, journalId); + } + + private void setContextForStarMgr(long journalId) { + setContext(false, journalId); + } + + private void setContext(boolean belongToGlobalStateMgr, long journalId) { + this.journalId = journalId; + this.subDir = belongToGlobalStateMgr ? "" : StarMgrServer.IMAGE_SUBDIR; + this.belongToGlobalStateMgr = belongToGlobalStateMgr; + this.imageDir = GlobalStateMgr.getServingState().getImageDir() + this.subDir; + } + + /* + * Definition of consistent: Suppose there are two images generated by FE and StarMgr, call FEImageNew + * and StarMgrImageNew and satisfy: + * FEImageNew = FEImageOld + editlog(i) + ... + editlog(j) + * StarMgrImageNew = StarMgrImageOld + editlog(k) + ... + editlog(m) + * + * Define Tj = generated time of editlog(j), Tmax = max(Tj, Tm) + * Consistency means all editlogs generated before Tmax (no matter the editlog is belong to FE or starMgr) + * should be included in the image generated by checkpoint. + * In other words, there must be no holes before the `maximum` editlog contained in the two images + * generated by checkpoint. + * + * How to get the consistent id: because editlog is generated and flush in a synchronous way, so we can simply + * get the `snapshot` of maxJouranlId for both FE side and StarMgr side. + * We get the `snapshot` in a lock-free way. As shown in the code below: + * (1) if feCheckpointIdT1 == feCheckpointIdT3 means in [T1, T3], no editlog added for FE side + * (2) if starMgrCheckpointIdT2 == starMgrCheckpointIdT4 means in [T2, T4], no editlog added for StarMgr side + * + * Because T1 < T2 < T3 < T4, from (1),(2) -> [T2, T3] no editlog added for FE side and StarMgr side + * So we get the snapshots are feCheckpointIdT3 and starMgrCheckpointIdT2 + */ + private Pair captureConsistentCheckpointIdBetweenFEAndStarMgr() { + if (feJournal == null || starMgrJournal == null) { + return null; + } + + int retryTime = CAPTURE_ID_RETRY_TIME; + while (retryTime > 0) { + long feCheckpointIdT1 = feJournal.getMaxJournalId(); + long starMgrCheckpointIdT2 = starMgrJournal.getMaxJournalId(); + long feCheckpointIdT3 = feJournal.getMaxJournalId(); + long starMgrCheckpointIdT4 = starMgrJournal.getMaxJournalId(); + + if (feCheckpointIdT1 == feCheckpointIdT3 && starMgrCheckpointIdT2 == starMgrCheckpointIdT4) { + return Pair.create(feCheckpointIdT3, starMgrCheckpointIdT2); + } + + try { + Thread.sleep(100); + } catch (Exception ignore) { + } + --retryTime; + } + return null; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java index 993053a85f041..a0fa46c9203b2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java @@ -17,13 +17,15 @@ import com.google.gson.annotations.SerializedName; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; +import com.starrocks.lake.snapshot.ClusterSnapshot; +import com.starrocks.lake.snapshot.ClusterSnapshotJob; import com.starrocks.persist.gson.GsonUtils; import java.io.DataInput; import java.io.IOException; public class ClusterSnapshotLog implements Writable { - public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT } + public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT, CREATE_SNAPSHOT, UPDATE_SNAPSHOT_JOB } @SerializedName(value = "type") private ClusterSnapshotLogType type = ClusterSnapshotLogType.NONE; // For CREATE_SNAPSHOT_PREFIX @@ -34,6 +36,12 @@ public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT // For DROP_SNAPSHOT @SerializedName(value = "dropSnapshotName") private String dropSnapshotName = ""; + // For CREATE_SNAPSHOT + @SerializedName(value = "snapshot") + private ClusterSnapshot snapshot = null; + // For UPDATE_SNAPSHOT_JOB + @SerializedName(value = "snapshotJob") + private ClusterSnapshotJob snapshotJob = null; public ClusterSnapshotLog() {} @@ -48,6 +56,16 @@ public void setDropSnapshot(String dropSnapshotName) { this.dropSnapshotName = dropSnapshotName; } + public void setCreateSnapshot(ClusterSnapshot snapshot) { + this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT; + this.snapshot = snapshot; + } + + public void setSnapshotJob(ClusterSnapshotJob job) { + this.type = ClusterSnapshotLogType.UPDATE_SNAPSHOT_JOB; + this.snapshotJob = job; + } + public ClusterSnapshotLogType getType() { return type; } @@ -64,6 +82,14 @@ public String getDropSnapshotName() { return this.dropSnapshotName; } + public ClusterSnapshot getSnapshot() { + return this.snapshot; + } + + public ClusterSnapshotJob getSnapshotJob() { + return this.snapshotJob; + } + public static ClusterSnapshotLog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ClusterSnapshotLog.class); diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index dbbb680839ad2..b7814f0393ccd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -142,6 +142,7 @@ import com.starrocks.lake.snapshot.ClusterSnapshotMgr; import com.starrocks.lake.vacuum.AutovacuumDaemon; import com.starrocks.leader.CheckpointController; +import com.starrocks.leader.ClusterSnapshotCheckpointController; import com.starrocks.leader.TaskRunStateSynchronizer; import com.starrocks.listener.GlobalLoadJobListenerBus; import com.starrocks.load.DeleteMgr; @@ -362,6 +363,8 @@ public class GlobalStateMgr { private CheckpointWorker checkpointWorker; private boolean checkpointWorkerStarted = false; + private CheckpointController clusterSnapshotCheckpointController = null; + private HAProtocol haProtocol = null; private final JournalObservable journalObservable; @@ -760,6 +763,8 @@ private GlobalStateMgr(boolean isCkptGlobalState, NodeMgr nodeMgr) { this.gtidGenerator = new GtidGenerator(); this.globalConstraintManager = new GlobalConstraintManager(); + this.clusterSnapshotMgr = new ClusterSnapshotMgr(); + GlobalStateMgr gsm = this; this.execution = new StateChangeExecution() { @Override @@ -820,8 +825,6 @@ public void transferToNonLeader(FrontendNodeType newType) { "query-deploy", true); this.warehouseIdleChecker = new WarehouseIdleChecker(); - - this.clusterSnapshotMgr = new ClusterSnapshotMgr(); } public static void destroyCheckpoint() { @@ -1320,6 +1323,12 @@ private void transferToLeader() { createBuiltinStorageVolume(); resourceGroupMgr.createBuiltinResourceGroupsIfNotExist(); keyMgr.initDefaultMasterKey(); + + // if leader change and the last cluster snapshot job has not been finished/error + // make the state as error, becase the job can not be continued in new leader. + if (clusterSnapshotMgr != null) { + clusterSnapshotMgr.resetLastUnFinishedAutomatedSnapshotJob(); + } } public void setFrontendNodeType(FrontendNodeType newType) { @@ -1336,6 +1345,10 @@ private void startLeaderOnlyDaemonThreads() { } StarMgrServer.getCurrentState().startCheckpointController(); + + clusterSnapshotCheckpointController = new ClusterSnapshotCheckpointController( + journal, StarMgrServer.getCurrentState().getJournalSystem().getJournal()); + clusterSnapshotCheckpointController.start(); } // start checkpoint thread @@ -2213,6 +2226,10 @@ public CheckpointController getCheckpointController() { return checkpointController; } + public CheckpointController getClusterSnapshotCheckpointController() { + return clusterSnapshotCheckpointController; + } + public void setLeader(LeaderInfo info) { nodeMgr.setLeader(info); } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java index 8cc4e204cae46..18bfcb2486721 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java @@ -876,6 +876,8 @@ private void dropFrontendHook(Frontend fe) { GlobalStateMgr.getCurrentState().getCheckpointController().cancelCheckpoint(fe.getNodeName(), "FE is dropped"); if (RunMode.isSharedDataMode()) { StarMgrServer.getCurrentState().getCheckpointController().cancelCheckpoint(fe.getNodeName(), "FE is dropped"); + GlobalStateMgr.getCurrentState().getClusterSnapshotCheckpointController(). + cancelCheckpoint(fe.getNodeName(), "FE is dropped"); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 1f1281a1824c4..8d48ed0379dc9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -3053,7 +3053,9 @@ public TFinishCheckpointResponse finishCheckpoint(TFinishCheckpointRequest reque } CheckpointController controller; - if (request.is_global_state_mgr) { + if (CheckpointController.clusterSnapshotCheckpointRunning()) { + controller = GlobalStateMgr.getCurrentState().getClusterSnapshotCheckpointController(); + } else if (request.is_global_state_mgr) { controller = GlobalStateMgr.getCurrentState().getCheckpointController(); } else { controller = StarMgrServer.getCurrentState().getCheckpointController(); @@ -3178,11 +3180,11 @@ static List getPartitionMetaImpl(Collection tabletMe @Override public TClusterSnapshotsResponse getClusterSnapshotsInfo(TClusterSnapshotsRequest params) { - return new TClusterSnapshotsResponse(); + return GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAllInfo(); } @Override public TClusterSnapshotJobsResponse getClusterSnapshotJobsInfo(TClusterSnapshotJobsRequest params) { - return new TClusterSnapshotJobsResponse(); + return GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAllJobsInfo(); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java index 0d45fbe858302..001f2ecfc5d40 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java @@ -17,6 +17,7 @@ import com.starrocks.common.DdlException; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; import com.starrocks.server.StorageVolumeMgr; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; @@ -31,6 +32,10 @@ public static void analyze(StatementBase stmt, ConnectContext session) { static class ClusterSnapshotAnalyzerVisitor implements AstVisitor { @Override public Void visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt statement, ConnectContext context) { + if (!RunMode.isSharedDataMode()) { + throw new SemanticException("Automated snapshot only support share data mode"); + } + if (GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().isAutomatedSnapshotOn()) { throw new SemanticException("Automated snapshot has been turn on"); } @@ -51,6 +56,10 @@ public Void visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotO @Override public Void visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt statement, ConnectContext context) { + if (!RunMode.isSharedDataMode()) { + throw new SemanticException("Automated snapshot only support share data mode"); + } + if (!GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().isAutomatedSnapshotOn()) { throw new SemanticException("Automated snapshot has not been turn on"); } diff --git a/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java b/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java index e81e5f4775c4a..3c620c19fadcf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java +++ b/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java @@ -193,6 +193,10 @@ public String getType() { return svt.toString(); } + public Map getProperties() { + return params; + } + private StorageVolumeType toStorageVolumeType(String svt) { switch (svt.toLowerCase()) { case "s3": diff --git a/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java b/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java index a118b71740303..ebe83deabd19d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/Frontend.java @@ -222,6 +222,8 @@ private void changeToDead(boolean isReplay) { GlobalStateMgr.getCurrentState().getCheckpointController().cancelCheckpoint(nodeName, "FE is dead"); if (RunMode.isSharedDataMode()) { StarMgrServer.getCurrentState().getCheckpointController().cancelCheckpoint(nodeName, "FE is dead"); + GlobalStateMgr.getCurrentState().getClusterSnapshotCheckpointController(). + cancelCheckpoint(nodeName, "FE is dead"); } } } @@ -244,6 +246,7 @@ private void restartHappened(boolean isReplay) { GlobalStateMgr.getCurrentState().getCheckpointController().workerRestarted(nodeName, startTime); if (RunMode.isSharedDataMode()) { StarMgrServer.getCurrentState().getCheckpointController().workerRestarted(nodeName, startTime); + GlobalStateMgr.getCurrentState().getClusterSnapshotCheckpointController().workerRestarted(nodeName, startTime); } } } diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java index a106febb6e772..ae58f10f3dc4d 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java @@ -14,8 +14,6 @@ package com.starrocks.lake.snapshot; -//import com.google.common.collect.Lists; - import com.starrocks.alter.AlterTest; import com.starrocks.backup.BlobStorage; import com.starrocks.backup.Status; @@ -23,10 +21,12 @@ import com.starrocks.common.DdlException; import com.starrocks.common.MetaNotFoundException; import com.starrocks.lake.StarOSAgent; +import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; import com.starrocks.persist.ClusterSnapshotLog; import com.starrocks.persist.EditLog; import com.starrocks.persist.Storage; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; import com.starrocks.server.StorageVolumeMgr; import com.starrocks.sql.analyzer.AnalyzeTestUtil; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; @@ -150,6 +150,13 @@ public String getRawServiceId() { } }; + new MockUp() { + @Mock + public RunMode getCurrentRunMode() { + return RunMode.SHARED_DATA; + } + }; + setAutomatedSnapshotOff(false); } @@ -205,6 +212,15 @@ public void testOperationOfAutomatedSnapshot() throws DdlException { analyzeFail(turnOFFSql); setAutomatedSnapshotOn(false); analyzeSuccess(turnOFFSql); + + // 2. test getInfo + setAutomatedSnapshotOn(false); + ClusterSnapshotJob job = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnapshotJob(); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnaphot(job); + ClusterSnapshot snapshot = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot(); + Assert.assertTrue(job.getInfo() != null); + Assert.assertTrue(snapshot.getInfo() != null); + setAutomatedSnapshotOff(false); } @Test @@ -214,9 +230,15 @@ public void testReplayClusterSnapshotLog() { logCreate.setCreateSnapshotNamePrefix(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX, storageVolumeName); GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate); + // create snapshot log + ClusterSnapshotLog logSnapshot = new ClusterSnapshotLog(); + clusterSnapshotMgr.createAutomatedSnapshotJob().setState(ClusterSnapshotJobState.FINISHED, false); + logSnapshot.setCreateSnapshot(clusterSnapshotMgr.getAutomatedSnapshot()); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate); + // drop automated snapshot request log ClusterSnapshotLog logDrop = new ClusterSnapshotLog(); logDrop.setDropSnapshot(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX); GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logDrop); } -} +} \ No newline at end of file From aeabbd241f6ba78550f298b67bdc05b93e07fb73 Mon Sep 17 00:00:00 2001 From: srlch Date: Wed, 8 Jan 2025 10:24:17 +0800 Subject: [PATCH 2/6] fix Signed-off-by: srlch --- .../leader/CheckpointController.java | 67 ++++++------------- 1 file changed, 20 insertions(+), 47 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java index bda4843cf4fd8..b044c889e7baa 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java @@ -34,7 +34,6 @@ package com.starrocks.leader; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.starrocks.common.Config; import com.starrocks.common.FeConstants; @@ -78,7 +77,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * CheckpointController daemon is running on master node. handle the checkpoint work for starrocks. @@ -88,8 +86,8 @@ public class CheckpointController extends FrontendDaemon { private static final int PUT_TIMEOUT_SECOND = 3600; private static final int CONNECT_TIMEOUT_SECOND = 1; private static final int READ_TIMEOUT_SECOND = 1; - private static final AtomicInteger EXCLUSIVE_RUNNING = new AtomicInteger(0); - private static final AtomicInteger CONCURRENT_RUNNING = new AtomicInteger(0); + private static int EXCLUSIVE_RUNNING = 0; + private static int CONCURRENT_RUNNING = 0; protected String imageDir; // subDir comes after base imageDir, to distinguish different module's image dir @@ -120,68 +118,45 @@ public CheckpointController(String name, Journal journal, String subDir, long in this.exclusiveExecution = false; } - private static boolean concurrentExecutionExistedOrComing() { - Preconditions.checkState(CONCURRENT_RUNNING.get() >= 0); - return CONCURRENT_RUNNING.get() > 0; - } - - private static boolean exclusiveExecutionExistedOrComing() { - Preconditions.checkState(EXCLUSIVE_RUNNING.get() >= 0); - return EXCLUSIVE_RUNNING.get() > 0; - } - - private static boolean feasibleExclusiveRunningState() { - return EXCLUSIVE_RUNNING.get() == 1; - } - - private static void addExclusiveExecution() { - EXCLUSIVE_RUNNING.incrementAndGet(); - } - - private static void addConcurrentExecution() { - CONCURRENT_RUNNING.incrementAndGet(); - } - - private static boolean checkAndBeginRunning(boolean exclusiveExecution) { + private static synchronized boolean checkAndBeginRunning(boolean exclusiveExecution) { String curRole = exclusiveExecution ? "exclusive checkpoint controller" : "concurrentable checkpoint controller"; String peerRole = exclusiveExecution ? "concurrentable checkpoint controller" : "exclusive checkpoint controller"; String errMsg = "Exit " + curRole + " because of the " + peerRole + " is running"; - if (exclusiveExecution && !exclusiveExecutionExistedOrComing() && !concurrentExecutionExistedOrComing()) { - addExclusiveExecution(); - } else if (!exclusiveExecution && !exclusiveExecutionExistedOrComing()) { - addConcurrentExecution(); + if (exclusiveExecution && EXCLUSIVE_RUNNING == 0 && CONCURRENT_RUNNING == 0) { + ++EXCLUSIVE_RUNNING; + } else if (!exclusiveExecution && EXCLUSIVE_RUNNING == 0) { + ++CONCURRENT_RUNNING; } else { LOG.info(errMsg); return false; } - if ((exclusiveExecution && (!feasibleExclusiveRunningState() || concurrentExecutionExistedOrComing())) || - (!exclusiveExecution && exclusiveExecutionExistedOrComing())) { - finishRunning(exclusiveExecution); - LOG.info(errMsg); - return false; - } - return true; } - private static void finishRunning(boolean exclusiveExecution) { + private static synchronized void finishRunning(boolean exclusiveExecution) { if (exclusiveExecution) { - EXCLUSIVE_RUNNING.decrementAndGet(); + --EXCLUSIVE_RUNNING; } else { - CONCURRENT_RUNNING.decrementAndGet(); + --CONCURRENT_RUNNING; } - Preconditions.checkState(EXCLUSIVE_RUNNING.get() >= 0); - Preconditions.checkState(CONCURRENT_RUNNING.get() >= 0); + } + + private static synchronized int exclusiveRunningCount() { + return EXCLUSIVE_RUNNING; + } + + private static synchronized int concurrentRunningCount() { + return CONCURRENT_RUNNING; } public static boolean normalCheckpointRunning() { - return concurrentExecutionExistedOrComing(); + return concurrentRunningCount() == 2; } public static boolean clusterSnapshotCheckpointRunning() { - return feasibleExclusiveRunningState(); + return exclusiveRunningCount() == 1; } @Override @@ -191,8 +166,6 @@ protected void runAfterCatalogReady() { } try { - Preconditions.checkState(exclusiveExecution && feasibleExclusiveRunningState() || - !exclusiveExecution && concurrentExecutionExistedOrComing()); runCheckpointController(); } finally { finishRunning(exclusiveExecution); From 512807b6e4d6ca6957f73c9ac27fc2b30d9bd3bd Mon Sep 17 00:00:00 2001 From: srlch Date: Wed, 8 Jan 2025 16:26:27 +0800 Subject: [PATCH 3/6] fix Signed-off-by: srlch --- .../leader/CheckpointController.java | 50 ++++++------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java index b044c889e7baa..3154586d36577 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java @@ -77,6 +77,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * CheckpointController daemon is running on master node. handle the checkpoint work for starrocks. @@ -86,8 +88,7 @@ public class CheckpointController extends FrontendDaemon { private static final int PUT_TIMEOUT_SECOND = 3600; private static final int CONNECT_TIMEOUT_SECOND = 1; private static final int READ_TIMEOUT_SECOND = 1; - private static int EXCLUSIVE_RUNNING = 0; - private static int CONCURRENT_RUNNING = 0; + private static final ReadWriteLock rwLock = new ReentrantReadWriteLock(); protected String imageDir; // subDir comes after base imageDir, to distinguish different module's image dir @@ -118,57 +119,36 @@ public CheckpointController(String name, Journal journal, String subDir, long in this.exclusiveExecution = false; } - private static synchronized boolean checkAndBeginRunning(boolean exclusiveExecution) { - String curRole = exclusiveExecution ? "exclusive checkpoint controller" : "concurrentable checkpoint controller"; - String peerRole = exclusiveExecution ? "concurrentable checkpoint controller" : "exclusive checkpoint controller"; - String errMsg = "Exit " + curRole + " because of the " + peerRole + " is running"; + public static boolean clusterSnapshotCheckpointRunning() { + return rwLock.isWriteLocked(); + } - if (exclusiveExecution && EXCLUSIVE_RUNNING == 0 && CONCURRENT_RUNNING == 0) { - ++EXCLUSIVE_RUNNING; - } else if (!exclusiveExecution && EXCLUSIVE_RUNNING == 0) { - ++CONCURRENT_RUNNING; + private boolean tryLock() { + if (exclusiveExecution) { + return rwLock.writeLock().tryLock(); } else { - LOG.info(errMsg); - return false; + return rwLock.readLock().tryLock(); } - - return true; } - private static synchronized void finishRunning(boolean exclusiveExecution) { + private void unlock() { if (exclusiveExecution) { - --EXCLUSIVE_RUNNING; + rwLock.writeLock().unlock(); } else { - --CONCURRENT_RUNNING; + rwLock.readLock().unlock(); } } - private static synchronized int exclusiveRunningCount() { - return EXCLUSIVE_RUNNING; - } - - private static synchronized int concurrentRunningCount() { - return CONCURRENT_RUNNING; - } - - public static boolean normalCheckpointRunning() { - return concurrentRunningCount() == 2; - } - - public static boolean clusterSnapshotCheckpointRunning() { - return exclusiveRunningCount() == 1; - } - @Override protected void runAfterCatalogReady() { - if (!checkAndBeginRunning(exclusiveExecution)) { + if (!tryLock()) { return; } try { runCheckpointController(); } finally { - finishRunning(exclusiveExecution); + unlock(); } } From 8fae4e13deb6ee68f803a0f8598d3c393113260b Mon Sep 17 00:00:00 2001 From: srlch Date: Wed, 8 Jan 2025 16:33:52 +0800 Subject: [PATCH 4/6] fix Signed-off-by: srlch --- .../java/com/starrocks/leader/CheckpointController.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java index 3154586d36577..1f6b0e1350aad 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java @@ -133,9 +133,13 @@ private boolean tryLock() { private void unlock() { if (exclusiveExecution) { - rwLock.writeLock().unlock(); + if (rwLock.writeLock().isHeldByCurrentThread()) { + rwLock.writeLock().unlock(); + } } else { - rwLock.readLock().unlock(); + if (rwLock.getReadHoldCount() > 0) { + rwLock.readLock().unlock(); + } } } From 7907961f405ce01d305d49a45dc6663b3bb4608f Mon Sep 17 00:00:00 2001 From: srlch Date: Wed, 8 Jan 2025 16:39:48 +0800 Subject: [PATCH 5/6] fix Signed-off-by: srlch --- .../leader/CheckpointController.java | 10 ++++---- .../ClusterSnapshotCheckpointController.java | 24 ++++++++++--------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java index 1f6b0e1350aad..fe259323af52a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java @@ -177,7 +177,7 @@ protected void runCheckpointController() { Pair createImageRet = Pair.create(false, ""); if (imageJournalId < maxJournalId) { this.journalId = maxJournalId; - createImageRet = createImage(Config.checkpoint_only_on_leader); + createImageRet = createImage(); } if (createImageRet.first) { // Push the image file to all other nodes @@ -212,9 +212,9 @@ private void init() { this.imageDir = GlobalStateMgr.getServingState().getImageDir() + subDir; } - protected Pair createImage(boolean checkpointOnlyOnLeader) { + protected Pair createImage() { result = new ArrayBlockingQueue<>(1); - workerNodeName = selectWorker(checkpointOnlyOnLeader); + workerNodeName = selectWorker(); if (workerNodeName == null) { LOG.warn("Failed to select worker to do checkpoint, journalId: {}", journalId); return Pair.create(false, workerNodeName); @@ -294,9 +294,9 @@ private void downloadImage(ImageFormatVersion imageFormatVersion, String imageDi cleaner.clean(); } - private String selectWorker(boolean checkpointOnlyOnLeader) { + private String selectWorker() { List workers; - if (checkpointOnlyOnLeader) { + if (Config.checkpoint_only_on_leader) { workers = new ArrayList<>(); } else { workers = GlobalStateMgr.getServingState().getNodeMgr().getOtherFrontends(); diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java index 0349787d6045c..6bf4a2dc843a6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java @@ -65,25 +65,27 @@ protected void runCheckpointController() { consistentIds.first, consistentIds.second); // step 2: create image for FE and StarMgr - - // for cluster snapshot checkpoint, image will be always created by leader node. - // because it can ensure that the request from the next normal checkpoint will has the largest journal id than - // version for any follower node. This is the same assumption as normal checkpoint and we don't need to change - // any code for normal controller and worker to handle the situation where the image version in follwer is larger. job.setState(ClusterSnapshotJobState.SNAPSHOTING, false); Pair createImageRet = Pair.create(false, ""); - createImageRet = createImageByLeaderForFE(consistentIds.first); + createImageRet = createImageByForFE(consistentIds.first); if (!createImageRet.first) { errMsg = "checkpoint failed for FE image, err msg: " + createImageRet.second; break; } - createImageRet = createImageByLeaderForStarMgr(consistentIds.second); + createImageRet = createImageByForStarMgr(consistentIds.second); if (!createImageRet.first) { errMsg = "checkpoint failed for StarMgr image, err msg: " + createImageRet.second; break; } + + // force roll journal for FE and starMgr to make sure that: + // 1. Next checkpoint journal id from normal checkpoint controller must larger than any + // image version of all follower nodes. + // 2. Avoid the shortcut exit of the normal checkpoint controller. + GlobalStateMgr.getCurrentState().triggerNewImage(); + StarMgrServer.getCurrentState().triggerNewImage(); LOG.info("Finished create image for cluster snapshot checkpoint"); // step 3: upload all finished image file @@ -109,14 +111,14 @@ protected void runCheckpointController() { // ClusterSnapshotCheckpointController does not responsible for pushImgae and deleteOldJournals } - private Pair createImageByLeaderForFE(long journalId) { + private Pair createImageByForFE(long journalId) { setContextForFE(journalId); - return createImage(true); + return createImage(); } - private Pair createImageByLeaderForStarMgr(long journalId) { + private Pair createImageByForStarMgr(long journalId) { setContextForStarMgr(journalId); - return createImage(true); + return createImage(); } private void setContextForFE(long journalId) { From bd8ede80d51ced16b53eb7e1919cffc530183a98 Mon Sep 17 00:00:00 2001 From: srlch Date: Thu, 9 Jan 2025 10:33:15 +0800 Subject: [PATCH 6/6] fix Signed-off-by: srlch --- .../lake/snapshot/ClusterSnapshot.java | 2 +- .../lake/snapshot/ClusterSnapshotJob.java | 34 ++++++++----------- .../lake/snapshot/ClusterSnapshotMgr.java | 25 +++++--------- .../leader/CheckpointController.java | 17 +++++----- .../ClusterSnapshotCheckpointController.java | 13 ++++--- .../starrocks/lake/ClusterSnapshotTest.java | 4 +-- 6 files changed, 43 insertions(+), 52 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java index 8ab59b21ecf04..3e8d48e3eea49 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshot.java @@ -21,7 +21,7 @@ import com.starrocks.thrift.TClusterSnapshotsItem; public class ClusterSnapshot { - public enum ClusterSnapshotType { AUTOMATED } + public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL } @SerializedName(value = "snapshotId") private long snapshotId; diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java index ab5a8918baf24..f1131af05cead 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotJob.java @@ -21,6 +21,8 @@ import com.starrocks.persist.gson.GsonUtils; import com.starrocks.server.GlobalStateMgr; import com.starrocks.thrift.TClusterSnapshotJobsItem; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -28,6 +30,7 @@ import java.time.Instant; public class ClusterSnapshotJob implements Writable { + public static final Logger LOG = LogManager.getLogger(ClusterSnapshotJob.class); /* * INITIALIZING: INIT state for the snapshot. * SNAPSHOTING: Doing checkpoint/image generation by replaying log for image both for FE and StarMgr and @@ -39,8 +42,6 @@ public enum ClusterSnapshotJobState { INITIALIZING, SNAPSHOTING, UPLOADING, FINI @SerializedName(value = "jobId") private long jobId; - @SerializedName(value = "snapshotNamePrefix") - private String snapshotNamePrefix; @SerializedName(value = "snapshotName") private String snapshotName; @SerializedName(value = "storageVolumeName") @@ -58,10 +59,8 @@ public enum ClusterSnapshotJobState { INITIALIZING, SNAPSHOTING, UPLOADING, FINI @SerializedName(value = "errMsg") private String errMsg; - public ClusterSnapshotJob(long jobId, String snapshotNamePrefix, String snapshotName, String storageVolumeName, - long createdTime) { + public ClusterSnapshotJob(long jobId, String snapshotName, String storageVolumeName, long createdTime) { this.jobId = jobId; - this.snapshotNamePrefix = snapshotNamePrefix; this.snapshotName = snapshotName; this.storageVolumeName = storageVolumeName; this.createdTime = createdTime; @@ -72,19 +71,11 @@ public ClusterSnapshotJob(long jobId, String snapshotNamePrefix, String snapshot this.errMsg = ""; } - public void setState(ClusterSnapshotJobState state, boolean isReplay) { + public void setState(ClusterSnapshotJobState state) { this.state = state; if (state == ClusterSnapshotJobState.FINISHED) { this.finishedTime = Instant.now().getEpochSecond(); } - - if (!isReplay) { - logJob(); - } - - if (state == ClusterSnapshotJobState.FINISHED) { - createSnapshotIfJobIsFinished(); - } } public void setJournalIds(long feJournalId, long starMgrJournalId) { @@ -96,10 +87,6 @@ public void setErrMsg(String errMsg) { this.errMsg = errMsg; } - public String getSnapshotNamePrefix() { - return snapshotNamePrefix; - } - public String getSnapshotName() { return snapshotName; } @@ -144,8 +131,15 @@ public void logJob() { GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); } - private void createSnapshotIfJobIsFinished() { - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnaphot(this); + public void addAutomatedClusterSnapshot() { + if (state == ClusterSnapshotJobState.FINISHED) { + ClusterSnapshot newAutomatedClusterSnapshot = new ClusterSnapshot( + GlobalStateMgr.getCurrentState().getNextId(), snapshotName, storageVolumeName, + createdTime, finishedTime, feJournalId, starMgrJournalId); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().addAutomatedClusterSnapshot(newAutomatedClusterSnapshot); + + LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName); + } } public TClusterSnapshotJobsItem getInfo() { diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java index aa61ccce1ae2e..7041cbce70727 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java @@ -101,13 +101,9 @@ protected void setAutomatedSnapshotOff() { automatedSnapshot = null; } - public void createAutomatedSnaphot(ClusterSnapshotJob job) { - ClusterSnapshot newAutomatedSnapshot = new ClusterSnapshot( - GlobalStateMgr.getCurrentState().getNextId(), job.getSnapshotName(), job.getStorageVolumeName(), - job.getCreatedTime(), job.getFinishedTime(), job.getFeJournalId(), job.getStarMgrJournalId()); - + protected void addAutomatedClusterSnapshot(ClusterSnapshot newAutomatedClusterSnapshot) { ClusterSnapshotLog log = new ClusterSnapshotLog(); - log.setCreateSnapshot(newAutomatedSnapshot); + log.setCreateSnapshot(newAutomatedClusterSnapshot); GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); if (automatedSnapshot != null && automatedSnapshot.getSnapshotName().startsWith(AUTOMATED_NAME_PREFIX)) { @@ -118,20 +114,16 @@ public void createAutomatedSnaphot(ClusterSnapshotJob job) { } } - automatedSnapshot = newAutomatedSnapshot; - - LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", job.getJobId(), - job.getSnapshotName()); + automatedSnapshot = newAutomatedClusterSnapshot; } public ClusterSnapshotJob createAutomatedSnapshotJob() { long createTime = Instant.now().getEpochSecond(); long jobId = GlobalStateMgr.getCurrentState().getNextId(); - String snapshotNamePrefix = ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX; - String snapshotName = snapshotNamePrefix + '_' + String.valueOf(createTime); + String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTime); String storageVolumeName = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSvName(); - ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotNamePrefix, snapshotName, storageVolumeName, createTime); - job.setState(ClusterSnapshotJobState.INITIALIZING, false); + ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotName, storageVolumeName, createTime); + job.logJob(); addJob(job); @@ -230,7 +222,7 @@ public void replayLog(ClusterSnapshotLog log) { // if a job do not finished/error but fe restart, we should reset the state as error // when replaying the log during FE restart. Because the job is unretryable after restart if (!GlobalStateMgr.getServingState().isReady() && job.isUnFinishedState()) { - job.setState(ClusterSnapshotJobState.ERROR, true); + job.setState(ClusterSnapshotJobState.ERROR); job.setErrMsg("Snapshot job has been failed"); } break; @@ -246,7 +238,8 @@ public void resetLastUnFinishedAutomatedSnapshotJob() { ClusterSnapshotJob job = historyAutomatedSnapshotJobs.lastEntry().getValue(); if (job.isUnFinishedState()) { job.setErrMsg("Snapshot job has been failed"); - job.setState(ClusterSnapshotJobState.ERROR, false); + job.setState(ClusterSnapshotJobState.ERROR); + job.logJob(); } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java index fe259323af52a..f649d9ebdc61d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/CheckpointController.java @@ -77,7 +77,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -88,7 +87,7 @@ public class CheckpointController extends FrontendDaemon { private static final int PUT_TIMEOUT_SECOND = 3600; private static final int CONNECT_TIMEOUT_SECOND = 1; private static final int READ_TIMEOUT_SECOND = 1; - private static final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private static final ReentrantReadWriteLock RW_LOCK = new ReentrantReadWriteLock(); protected String imageDir; // subDir comes after base imageDir, to distinguish different module's image dir @@ -120,25 +119,25 @@ public CheckpointController(String name, Journal journal, String subDir, long in } public static boolean clusterSnapshotCheckpointRunning() { - return rwLock.isWriteLocked(); + return RW_LOCK.isWriteLocked(); } private boolean tryLock() { if (exclusiveExecution) { - return rwLock.writeLock().tryLock(); + return RW_LOCK.writeLock().tryLock(); } else { - return rwLock.readLock().tryLock(); + return RW_LOCK.readLock().tryLock(); } } private void unlock() { if (exclusiveExecution) { - if (rwLock.writeLock().isHeldByCurrentThread()) { - rwLock.writeLock().unlock(); + if (RW_LOCK.writeLock().isHeldByCurrentThread()) { + RW_LOCK.writeLock().unlock(); } } else { - if (rwLock.getReadHoldCount() > 0) { - rwLock.readLock().unlock(); + if (RW_LOCK.getReadHoldCount() > 0) { + RW_LOCK.readLock().unlock(); } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java b/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java index 6bf4a2dc843a6..c1a88796b9d3f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/ClusterSnapshotCheckpointController.java @@ -65,7 +65,8 @@ protected void runCheckpointController() { consistentIds.first, consistentIds.second); // step 2: create image for FE and StarMgr - job.setState(ClusterSnapshotJobState.SNAPSHOTING, false); + job.setState(ClusterSnapshotJobState.SNAPSHOTING); + job.logJob(); Pair createImageRet = Pair.create(false, ""); createImageRet = createImageByForFE(consistentIds.first); @@ -89,7 +90,8 @@ protected void runCheckpointController() { LOG.info("Finished create image for cluster snapshot checkpoint"); // step 3: upload all finished image file - job.setState(ClusterSnapshotJobState.UPLOADING, false); + job.setState(ClusterSnapshotJobState.UPLOADING); + job.logJob(); try { ClusterSnapshotUtils.uploadAutomatedSnapshotToRemote(job.getSnapshotName()); } catch (StarRocksException e) { @@ -100,10 +102,13 @@ protected void runCheckpointController() { if (!errMsg.isEmpty()) { job.setErrMsg(errMsg); - job.setState(ClusterSnapshotJobState.ERROR, false); + job.setState(ClusterSnapshotJobState.ERROR); + job.logJob(); LOG.warn("Cluster Snapshot checkpoint failed: " + errMsg); } else { - job.setState(ClusterSnapshotJobState.FINISHED, false); + job.setState(ClusterSnapshotJobState.FINISHED); + job.logJob(); + job.addAutomatedClusterSnapshot(); LOG.warn("Finish Cluster Snapshot checkpoint, FE checkpoint journal Id: {}, StarMgr checkpoint journal Id: {}", job.getFeJournalId(), job.getStarMgrJournalId()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java index ae58f10f3dc4d..f64a1c2fba8e2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java @@ -216,7 +216,7 @@ public void testOperationOfAutomatedSnapshot() throws DdlException { // 2. test getInfo setAutomatedSnapshotOn(false); ClusterSnapshotJob job = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnapshotJob(); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnaphot(job); + job.addAutomatedClusterSnapshot(); ClusterSnapshot snapshot = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot(); Assert.assertTrue(job.getInfo() != null); Assert.assertTrue(snapshot.getInfo() != null); @@ -232,7 +232,7 @@ public void testReplayClusterSnapshotLog() { // create snapshot log ClusterSnapshotLog logSnapshot = new ClusterSnapshotLog(); - clusterSnapshotMgr.createAutomatedSnapshotJob().setState(ClusterSnapshotJobState.FINISHED, false); + clusterSnapshotMgr.createAutomatedSnapshotJob().setState(ClusterSnapshotJobState.FINISHED); logSnapshot.setCreateSnapshot(clusterSnapshotMgr.getAutomatedSnapshot()); GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate);