Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Cluster Snapshot Backup: checkpoint and image backup (part3) #54695

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace starrocks {
SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"JOB_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"STATE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"DETAIL_INFO", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"ERROR_MESSAGE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand All @@ -52,8 +52,16 @@ Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotJobsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), info.job_id, info.created_time,
info.finished_time, Slice(info.state), Slice(info.detail_info),
Slice(info.snapshot_name),
info.job_id,

TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

Slice(info.state),
Slice(info.detail_info),

Slice(info.error_message),
};
for (const auto& [slot_id, index] : slot_id_map) {
Expand Down
16 changes: 12 additions & 4 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace starrocks {
SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"SNAPSHOT_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FE_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"STARMGR_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"PROPERTIES", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand Down Expand Up @@ -54,9 +54,17 @@ Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time,
Slice(info.snapshot_name),
Slice(info.snapshot_type),

info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume),
TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

info.fe_jouranl_id,
info.starmgr_jouranl_id,
Slice(info.properties),
Slice(info.storage_volume),

Slice(info.storage_path),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static SystemTable create() {
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static SystemTable create() {
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3390,4 +3390,10 @@ public class Config extends ConfigBase {

@ConfField(mutable = false)
public static int query_deploy_threadpool_size = max(50, getRuntime().availableProcessors() * 10);

@ConfField(mutable = true)
public static long automated_cluster_snapshot_interval_seconds = 1800;

@ConfField(mutable = false)
public static int max_historical_automated_cluster_snapshot_jobs = 100;
}
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public static void copyToLocal(String srcPath, String destPath, Map<String, Stri
hdfsService.copyToLocal(srcPath, destPath, properties);
}

public static void copyFromLocal(String srcPath, String destPath, Map<String, String> properties)
throws StarRocksException {
hdfsService.copyFromLocal(srcPath, destPath, properties);
}

/**
* Parse file status in path with broker, except directory
*
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -47,6 +48,7 @@
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.regions.Region;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -1212,6 +1214,22 @@ public void copyToLocal(String srcPath, String destPath, Map<String, String> pro
}
}

public void copyFromLocal(String srcPath, String destPath, Map<String, String> properties) throws StarRocksException {
HdfsFs fileSystem = getFileSystem(destPath, properties, null);
try {
WildcardURI destPathUri = new WildcardURI(destPath);
File srcFile = new File(srcPath);
FileUtil.copy(srcFile, fileSystem.getDFSFileSystem(), new Path(destPathUri.getPath()), false, new Configuration());
Comment on lines +1221 to +1222
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that copyFromLocalFile will failed if the src is not a file

} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while copy local {} to {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e);
} catch (Exception e) {
LOG.error("Exception while copy local {} to {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e);
}
}

public List<FileStatus> listFileMeta(String path, Map<String, String> properties) throws StarRocksException {
WildcardURI pathUri = new WildcardURI(path);
HdfsFs fileSystem = getFileSystem(path, properties, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public void copyToLocal(String srcPath, String destPath, Map<String, String> pro
LOG.info("Copied {} to local {}", srcPath, destPath);
}

public void copyFromLocal(String srcPath, String destPath, Map<String, String> properties) throws StarRocksException {
fileSystemManager.copyFromLocal(srcPath, destPath, properties);
LOG.info("Copied local {} to {}", srcPath, destPath);
}

public void listPath(TBrokerListPathRequest request, List<TBrokerFileStatus> fileStatuses, boolean skipDir,
boolean fileNameOnly) throws StarRocksException {
LOG.info("receive a list path request, path: {}", request.path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ protected void runAfterCatalogReady() {
if (nextPoint == null) {
return;
}

if (nextPoint.journalId <= getImageJournalId()) {
return;
}

if (nextPoint.epoch != servingGlobalState.getEpoch()) {
return;
}
Expand Down Expand Up @@ -133,6 +135,10 @@ private void finishCheckpoint(long epoch, long journalId, boolean isSuccess, Str
String nodeName = servingGlobalState.getNodeMgr().getNodeName();
if (servingGlobalState.isLeader()) {
CheckpointController controller = getCheckpointController();
if (CheckpointController.clusterSnapshotCheckpointRunning()) {
controller = GlobalStateMgr.getServingState().getClusterSnapshotCheckpointController();
}

if (isSuccess) {
try {
controller.finishCheckpoint(journalId, nodeName);
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void getServiceId() {
LOG.info("get serviceId {} from starMgr", serviceId);
}

public String getRawServiceId() {
return serviceId;
}

public String addFileStore(FileStoreInfo fsInfo) throws DdlException {
try {
return client.addFileStore(fsInfo, serviceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.storagevolume.StorageVolume;
import com.starrocks.thrift.TClusterSnapshotsItem;

public class ClusterSnapshot {
public enum ClusterSnapshotType { AUTOMATED }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public enum ClusterSnapshotType { AUTOMATED }
public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL }


@SerializedName(value = "snapshotId")
private long snapshotId;
@SerializedName(value = "snapshotName")
private String snapshotName;
@SerializedName(value = "type")
private ClusterSnapshotType type;
@SerializedName(value = "storageVolumeName")
private String storageVolumeName;
@SerializedName(value = "createdTime")
private long createdTime;
@SerializedName(value = "finishedTime")
private long finishedTime;
@SerializedName(value = "feJournalId")
private long feJournalId;
@SerializedName(value = "starMgrJournal")
private long starMgrJournalId;

public ClusterSnapshot() {}

public ClusterSnapshot(long snapshotId, String snapshotName, String storageVolumeName, long createdTime,
long finishedTime, long feJournalId, long starMgrJournalId) {
this.snapshotId = snapshotId;
this.snapshotName = snapshotName;
this.type = ClusterSnapshotType.AUTOMATED;
this.storageVolumeName = storageVolumeName;
this.createdTime = createdTime;
this.finishedTime = finishedTime;
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}

public String getSnapshotName() {
return snapshotName;
}

public long getFinishedTime() {
return finishedTime;
}

public TClusterSnapshotsItem getInfo() {
TClusterSnapshotsItem item = new TClusterSnapshotsItem();
item.setSnapshot_name(snapshotName);
item.setSnapshot_type(type.name());
item.setCreated_time(createdTime);
item.setFinished_time(finishedTime);
item.setFe_jouranl_id(feJournalId);
item.setStarmgr_jouranl_id(starMgrJournalId);
item.setProperties("");
item.setStorage_volume(storageVolumeName);

StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr();
try {
StorageVolume sv = storageVolumeMgr.getStorageVolumeByName(storageVolumeName);
if (sv == null) {
throw new Exception("Unknown storage volume: " + storageVolumeName);
}
item.setStorage_path(ClusterSnapshotUtils.getSnapshotImagePath(sv, snapshotName));
} catch (Exception e) {
item.setStorage_path("");
}
return item;
}
}
Loading
Loading