Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Cluster Snapshot Backup: SQL Interface and meta data (part 1) (backport #54447) #54505

Merged
merged 2 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.starrocks.persist.CancelDecommissionDiskInfo;
import com.starrocks.persist.CancelDisableDiskInfo;
import com.starrocks.persist.ChangeMaterializedViewRefreshSchemeLog;
import com.starrocks.persist.ClusterSnapshotLog;
import com.starrocks.persist.ColocatePersistInfo;
import com.starrocks.persist.ColumnRenameInfo;
import com.starrocks.persist.ConsistencyCheckInfo;
Expand Down Expand Up @@ -775,6 +776,10 @@ public void readFields(DataInput in) throws IOException {
data = DropWarehouseLog.read(in);
break;
}
case OperationType.OP_CLUSTER_SNAPSHOT_LOG: {
data = ClusterSnapshotLog.read(in);
break;
}
default: {
if (Config.metadata_ignore_unknown_operation_type) {
LOG.warn("UNKNOWN Operation Type {}", opCode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.persist.ClusterSnapshotLog;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockID;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockWriter;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;

// only used for AUTOMATED snapshot for now
public class ClusterSnapshotMgr implements GsonPostProcessable {
public static final Logger LOG = LogManager.getLogger(ClusterSnapshotMgr.class);
public static final String AUTOMATED_NAME_PREFIX = "automated_cluster_snapshot";

@SerializedName(value = "automatedSnapshotSvName")
private String automatedSnapshotSvName = "";

public ClusterSnapshotMgr() {}

// Turn on automated snapshot, use stmt for extension in future
public void setAutomatedSnapshotOn(AdminSetAutomatedSnapshotOnStmt stmt) {
String storageVolumeName = stmt.getStorageVolumeName();
setAutomatedSnapshotOn(storageVolumeName);

ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setCreateSnapshotNamePrefix(AUTOMATED_NAME_PREFIX, storageVolumeName);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);
}

protected void setAutomatedSnapshotOn(String storageVolumeName) {
automatedSnapshotSvName = storageVolumeName;
}

public String getAutomatedSnapshotSvName() {
return automatedSnapshotSvName;
}

public boolean isAutomatedSnapshotOn() {
return automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty();
}

// Turn off automated snapshot, use stmt for extension in future
public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) {
setAutomatedSnapshotOff();

ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setDropSnapshot(AUTOMATED_NAME_PREFIX);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);
}

protected void setAutomatedSnapshotOff() {
// drop AUTOMATED snapshot
automatedSnapshotSvName = "";
}

public void replayLog(ClusterSnapshotLog log) {
ClusterSnapshotLog.ClusterSnapshotLogType logType = log.getType();
switch (logType) {
case CREATE_SNAPSHOT_PREFIX: {
String createSnapshotNamePrefix = log.getCreateSnapshotNamePrefix();
String storageVolumeName = log.getStorageVolumeName();
if (createSnapshotNamePrefix.equals(AUTOMATED_NAME_PREFIX)) {
setAutomatedSnapshotOn(storageVolumeName);
}
break;
}
case DROP_SNAPSHOT: {
String dropSnapshotName = log.getDropSnapshotName();
if (dropSnapshotName.equals(AUTOMATED_NAME_PREFIX)) {
setAutomatedSnapshotOff();
}
break;
}
default: {
LOG.warn("Invalid Cluster Snapshot Log Type {}", logType);
}
}
}

public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException {
SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, 1);
writer.writeJson(this);
writer.close();
}

public void load(SRMetaBlockReader reader)
throws SRMetaBlockEOFException, IOException, SRMetaBlockException {
ClusterSnapshotMgr data = reader.readJson(ClusterSnapshotMgr.class);
}

@Override
public void gsonPostProcess() throws IOException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.persist;

import com.google.gson.annotations.SerializedName;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ClusterSnapshotLog implements Writable {
public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT }
@SerializedName(value = "type")
private ClusterSnapshotLogType type = ClusterSnapshotLogType.NONE;
// For CREATE_SNAPSHOT_PREFIX
@SerializedName(value = "createSnapshotNamePrefix")
private String createSnapshotNamePrefix = "";
@SerializedName(value = "storageVolumeName")
private String storageVolumeName = "";
// For DROP_SNAPSHOT
@SerializedName(value = "dropSnapshotName")
private String dropSnapshotName = "";

public ClusterSnapshotLog() {}

public void setCreateSnapshotNamePrefix(String createSnapshotNamePrefix, String storageVolumeName) {
this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT_PREFIX;
this.createSnapshotNamePrefix = createSnapshotNamePrefix;
this.storageVolumeName = storageVolumeName;
}

public void setDropSnapshot(String dropSnapshotName) {
this.type = ClusterSnapshotLogType.DROP_SNAPSHOT;
this.dropSnapshotName = dropSnapshotName;
}

public ClusterSnapshotLogType getType() {
return type;
}

public String getCreateSnapshotNamePrefix() {
return this.createSnapshotNamePrefix;
}

public String getStorageVolumeName() {
return this.storageVolumeName;
}

public String getDropSnapshotName() {
return this.dropSnapshotName;
}

public static ClusterSnapshotLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ClusterSnapshotLog.class);
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,11 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
warehouseMgr.replayAlterWarehouse(wh);
break;
}
case OperationType.OP_CLUSTER_SNAPSHOT_LOG: {
ClusterSnapshotLog log = (ClusterSnapshotLog) journal.getData();
globalStateMgr.getClusterSnapshotMgr().replayLog(log);
break;
}
default: {
if (Config.metadata_ignore_unknown_operation_type) {
LOG.warn("UNKNOWN Operation Type {}", opCode);
Expand Down Expand Up @@ -1957,4 +1962,8 @@ public void logCancelDisableDisk(CancelDisableDiskInfo info) {
public void logRecoverPartitionVersion(PartitionVersionRecoveryInfo info) {
logEdit(OperationType.OP_RECOVER_PARTITION_VERSION, info);
}

public void logClusterSnapshotLog(ClusterSnapshotLog info) {
logEdit(OperationType.OP_CLUSTER_SNAPSHOT_LOG, info);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ public class OperationType {
@IgnorableOnReplayFailed
public static final short OP_ADD_KEY = 13512;

@IgnorableOnReplayFailed
public static final short OP_CLUSTER_SNAPSHOT_LOG = 13513;

/**
* NOTICE: OperationType cannot use a value exceeding 20000, please follow the above sequence number
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public int getId() {

public static final SRMetaBlockID PIPE_MGR = new SRMetaBlockID(32);

public static final SRMetaBlockID CLUSTER_SNAPSHOT_MGR = new SRMetaBlockID(33);

/**
* NOTICE: SRMetaBlockID cannot use a value exceeding 20000, please follow the above sequence number
*/
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.starrocks.sql.ast.AdminCancelRepairTableStmt;
import com.starrocks.sql.ast.AdminCheckTabletsStmt;
import com.starrocks.sql.ast.AdminRepairTableStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt;
import com.starrocks.sql.ast.AdminSetConfigStmt;
import com.starrocks.sql.ast.AdminSetPartitionVersionStmt;
import com.starrocks.sql.ast.AdminSetReplicaStatusStmt;
Expand Down Expand Up @@ -1174,6 +1176,24 @@ public ShowResultSet visitDropWarehouseStatement(DropWarehouseStmt stmt, Connect
});
return null;
}

@Override
public ShowResultSet visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt stmt,
ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getClusterSnapshotMgr().setAutomatedSnapshotOn(stmt);
});
return null;
}

@Override
public ShowResultSet visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt stmt,
ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getClusterSnapshotMgr().setAutomatedSnapshotOff(stmt);
});
return null;
}
}

}
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import com.starrocks.lake.StarOSAgent;
import com.starrocks.lake.compaction.CompactionControlScheduler;
import com.starrocks.lake.compaction.CompactionMgr;
import com.starrocks.lake.snapshot.ClusterSnapshotMgr;
import com.starrocks.lake.vacuum.AutovacuumDaemon;
import com.starrocks.leader.CheckpointController;
import com.starrocks.leader.TaskRunStateSynchronizer;
Expand Down Expand Up @@ -514,6 +515,8 @@ public class GlobalStateMgr {
private final ExecutorService queryDeployExecutor;
private final WarehouseIdleChecker warehouseIdleChecker;

private final ClusterSnapshotMgr clusterSnapshotMgr;

public NodeMgr getNodeMgr() {
return nodeMgr;
}
Expand Down Expand Up @@ -814,7 +817,10 @@ public void transferToNonLeader(FrontendNodeType newType) {
this.queryDeployExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(Config.query_deploy_threadpool_size, Integer.MAX_VALUE,
"query-deploy", true);

this.warehouseIdleChecker = new WarehouseIdleChecker();

this.clusterSnapshotMgr = new ClusterSnapshotMgr();
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -1057,6 +1063,10 @@ public GlobalConstraintManager getGlobalConstraintManager() {
return globalConstraintManager;
}

public ClusterSnapshotMgr getClusterSnapshotMgr() {
return clusterSnapshotMgr;
}

// Use tryLock to avoid potential deadlock
public boolean tryLock(boolean mustLock) {
while (true) {
Expand Down Expand Up @@ -1541,6 +1551,7 @@ public void loadImage(String imageDir) throws IOException {
.put(SRMetaBlockID.KEY_MGR, keyMgr::load)
.put(SRMetaBlockID.PIPE_MGR, pipeManager.getRepo()::load)
.put(SRMetaBlockID.WAREHOUSE_MGR, warehouseMgr::load)
.put(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, clusterSnapshotMgr::load)
.build();

Set<SRMetaBlockID> metaMgrMustExists = new HashSet<>(loadImages.keySet());
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.starrocks.sql.ast.AdminCancelRepairTableStmt;
import com.starrocks.sql.ast.AdminCheckTabletsStmt;
import com.starrocks.sql.ast.AdminRepairTableStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt;
import com.starrocks.sql.ast.AdminSetConfigStmt;
import com.starrocks.sql.ast.AdminSetPartitionVersionStmt;
import com.starrocks.sql.ast.AdminSetReplicaStatusStmt;
Expand Down Expand Up @@ -592,6 +594,19 @@ public Void visitPauseRoutineLoadStatement(PauseRoutineLoadStmt statement, Conne
return null;
}

@Override
public Void visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt statement, ConnectContext context) {
ClusterSnapshotAnalyzer.analyze(statement, context);
return null;
}

@Override
public Void visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt statement,
ConnectContext context) {
ClusterSnapshotAnalyzer.analyze(statement, context);
return null;
}

// ---------------------------------------- Catalog Statement -------------------------------------------

@Override
Expand Down
Loading
Loading