Skip to content

Commit

Permalink
[Feature] Support Cluster Snapshot Backup: checkpoint and image backu…
Browse files Browse the repository at this point in the history
…p (part3)

Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch committed Jan 5, 2025
1 parent defc9c3 commit d723b6f
Show file tree
Hide file tree
Showing 25 changed files with 877 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace starrocks {
SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"JOB_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"STATE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"DETAIL_INFO", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"ERROR_MESSAGE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand All @@ -52,8 +52,14 @@ Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotJobsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), info.job_id, info.created_time,
info.finished_time, Slice(info.state), Slice(info.detail_info),
Slice(info.snapshot_name), info.job_id,

TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

Slice(info.state), Slice(info.detail_info),

Slice(info.error_message),
};
for (const auto& [slot_id, index] : slot_id_map) {
Expand Down
10 changes: 7 additions & 3 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace starrocks {
SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"SNAPSHOT_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_DATETIME), sizeof(DateTimeValue), true},
{"FE_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"STARMGR_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"PROPERTIES", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
Expand Down Expand Up @@ -54,7 +54,11 @@ Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time,
Slice(info.snapshot_name), Slice(info.snapshot_type),

TimestampValue::create_from_unixtime(info.created_time, _runtime_state->timezone_obj()),

TimestampValue::create_from_unixtime(info.finished_time, _runtime_state->timezone_obj()),

info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static SystemTable create() {
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static SystemTable create() {
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.DATETIME))
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3390,4 +3390,10 @@ public class Config extends ConfigBase {

@ConfField(mutable = false)
public static int query_deploy_threadpool_size = max(50, getRuntime().availableProcessors() * 10);

@ConfField(mutable = true)
public static long automated_cluster_snapshot_interval_seconds = 1800;

@ConfField(mutable = false)
public static int max_historical_automated_cluster_snapshot_jobs = 100;
}
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/HdfsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class HdfsUtil {
private static final Logger LOG = LogManager.getLogger(HdfsUtil.class);
Expand All @@ -65,6 +66,11 @@ public static void getTProperties(String path, BrokerDesc brokerDesc, THdfsProp
hdfsService.getTProperties(path, brokerDesc.getProperties(), tProperties);
}

public static void copyFromLocal(String srcPath, String destPath, Map<String, String> properties)
throws StarRocksException {
hdfsService.copyFromLocal(srcPath, destPath, properties);
}

/**
* Parse file status in path with broker, except directory
*
Expand Down
19 changes: 19 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -47,6 +48,7 @@
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.regions.Region;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -1197,6 +1199,23 @@ public void getTProperties(String path, Map<String, String> loadProperties, THdf
getFileSystem(path, loadProperties, tProperties);
}

public void copyFromLocal(String srcPath, String destPath, Map<String, String> properties) throws StarRocksException {
HdfsFs fileSystem = getFileSystem(destPath, properties, null);
try {
WildcardURI destPathUri = new WildcardURI(destPath);

File srcFile = new File(srcPath);
FileUtil.copy(srcFile, fileSystem.getDFSFileSystem(), new Path(destPathUri.getPath()), false, new Configuration());
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while copy from local {} to {} ", srcPath, destPath + " " + e.getMessage());
throw new StarRocksException("Failed to copy " + destPath + " from local " + srcPath + " " + e.getMessage());
} catch (Exception e) {
LOG.error("Exception while copy from local {} to {} ", srcPath, destPath + " " + e.getMessage());
throw new StarRocksException("Failed to copy " + destPath + " from local " + srcPath + " " + e.getMessage());
}
}

public List<FileStatus> listFileMeta(String path, Map<String, String> properties) throws StarRocksException {
WildcardURI pathUri = new WildcardURI(path);
HdfsFs fileSystem = getFileSystem(path, properties, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void getTProperties(String path, Map<String, String> loadProperties, THdf
fileSystemManager.getTProperties(path, loadProperties, tProperties);
}

public void copyFromLocal(String srcPath, String destPath, Map<String, String> properties) throws StarRocksException {
fileSystemManager.copyFromLocal(srcPath, destPath, properties);
LOG.info("Copied {} from local {}", destPath, srcPath);
}

public void listPath(TBrokerListPathRequest request, List<TBrokerFileStatus> fileStatuses, boolean skipDir,
boolean fileNameOnly) throws StarRocksException {
LOG.info("receive a list path request, path: {}", request.path);
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/http/meta/MetaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,22 @@ public void executeGet(BaseRequest request, BaseResponse response) {
String realDir = getRealImageDir(GlobalStateMgr.getCurrentState().getImageDir(),
subDirStr, imageFormatVersion);
File dir = new File(realDir);

long imageVersionInRealDir = 0;
try {
Storage storage = new Storage(realDir);
imageVersionInRealDir = storage.getImageJournalId();
} catch (IOException e) {
LOG.warn("failed to check local image file version. dir: {}", realDir, e);
writeResponse(request, response, HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}

try {
if (Files.exists(Path.of(realDir + "/" + filename))) {
LOG.info("image file : {} version: {} already exists, ignore", filename, imageFormatVersion);
} else if (imageVersionInRealDir > version) {
LOG.info("this node has a image with larger version: {}, ignore", imageVersionInRealDir);
} else {
MetaHelper.downloadImageFile(url, DOWNLOAD_TIMEOUT_SECOND * 1000, versionStr, dir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ protected void runAfterCatalogReady() {
if (nextPoint == null) {
return;
}
if (nextPoint.journalId <= getImageJournalId()) {

if (nextPoint.epoch != servingGlobalState.getEpoch()) {
return;
}
if (nextPoint.epoch != servingGlobalState.getEpoch()) {

long curJournalId = getImageJournalId();
if (nextPoint.journalId <= curJournalId) {
finishCheckpoint(nextPoint.epoch, curJournalId, true, "success");
return;
}

Expand Down Expand Up @@ -133,6 +137,12 @@ private void finishCheckpoint(long epoch, long journalId, boolean isSuccess, Str
String nodeName = servingGlobalState.getNodeMgr().getNodeName();
if (servingGlobalState.isLeader()) {
CheckpointController controller = getCheckpointController();
if (CheckpointController.clusterSnapshotCheckpointRunning()) {
controller = GlobalStateMgr.getServingState().getClusterSnapshotCheckpointController();
} else {
controller = getCheckpointController();
}

if (isSuccess) {
try {
controller.finishCheckpoint(journalId, nodeName);
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void getServiceId() {
LOG.info("get serviceId {} from starMgr", serviceId);
}

public String getRawServiceId() {
return serviceId;
}

public String addFileStore(FileStoreInfo fsInfo) throws DdlException {
try {
return client.addFileStore(fsInfo, serviceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.storagevolume.StorageVolume;
import com.starrocks.thrift.TClusterSnapshotsItem;

public class ClusterSnapshot {
public enum ClusterSnapshotType { AUTOMATED }

@SerializedName(value = "snapshotId")
private long snapshotId;
@SerializedName(value = "snapshotName")
private String snapshotName;
@SerializedName(value = "type")
private ClusterSnapshotType type;
@SerializedName(value = "storageVolumeName")
private String storageVolumeName;
@SerializedName(value = "createTime")
private long createTime;
@SerializedName(value = "successTime")
private long successTime;
@SerializedName(value = "feJournalId")
private long feJournalId;
@SerializedName(value = "starMgrJournal")
private long starMgrJournalId;

public ClusterSnapshot() {}

public ClusterSnapshot(long snapshotId, String snapshotName, String storageVolumeName, long createTime,
long successTime, long feJournalId, long starMgrJournalId) {
this.snapshotId = snapshotId;
this.snapshotName = snapshotName;
this.type = ClusterSnapshotType.AUTOMATED;
this.storageVolumeName = storageVolumeName;
this.createTime = createTime;
this.successTime = successTime;
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}

public String getSnapshotName() {
return snapshotName;
}

public long getSuccessTime() {
return successTime;
}

public TClusterSnapshotsItem getInfo() {
TClusterSnapshotsItem item = new TClusterSnapshotsItem();
item.setSnapshot_name(snapshotName);
item.setSnapshot_type(type.name());
item.setCreated_time(createTime);
item.setFinished_time(successTime);
item.setFe_jouranl_id(feJournalId);
item.setStarmgr_jouranl_id(starMgrJournalId);
item.setProperties("");
item.setStorage_volume(storageVolumeName);

StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr();
try {
StorageVolume sv = storageVolumeMgr.getStorageVolumeByName(storageVolumeName);
if (sv == null) {
throw new Exception("Unknown storage volume: " + storageVolumeName);
}
item.setStorage_path(ClusterSnapshotUtils.getSnapshotImagePath(sv, snapshotName));
} catch (Exception e) {
item.setStorage_path("");
}
return item;
}
}
Loading

0 comments on commit d723b6f

Please sign in to comment.