Skip to content

Commit

Permalink
[Feature] Support restoring from a cluster snapshot for shared-data m…
Browse files Browse the repository at this point in the history
…ode (part 2, update metadata during restoring) (#54128)

## Why I'm doing:
To support disaster recovery for shared-data mode.

## What I'm doing:
Update metadata during restoring from a cluster snapshot according to the `cluster_snapshot.yaml`

Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg authored Dec 23, 2024
1 parent fe5e98f commit 2ad831b
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,6 @@ public String toString() {
}

public static class StorageVolume {
public static enum StorageVolumeType {
S3,
HDFS,
AZBLOB;

@JsonCreator
public static StorageVolumeType forValue(String value) {
return StorageVolumeType.valueOf(value.toUpperCase());
}

@JsonValue
public String toValue() {
return name().toLowerCase();
}
}

private static class PropertiesDeserializer extends JsonDeserializer<Map<String, String>> {

@Override
Expand Down Expand Up @@ -178,7 +162,7 @@ public Map<String, String> deserialize(JsonParser parser, DeserializationContext
private String name;

@JsonProperty("type")
private StorageVolumeType type;
private String type;

@JsonProperty("location")
private String location;
Expand All @@ -198,11 +182,11 @@ public void setName(String name) {
this.name = name;
}

public StorageVolumeType getType() {
public String getType() {
return type;
}

public void setType(StorageVolumeType type) {
public void setType(String type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.NodeMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
Expand All @@ -27,6 +28,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;

public class RestoreClusterSnapshotMgr {
Expand Down Expand Up @@ -63,56 +65,26 @@ public static boolean isRestoring() {
return instance != null;
}

public static ClusterSnapshotConfig getConfig() {
RestoreClusterSnapshotMgr self = instance;
if (self == null) {
return null;
}
return self.config;
}

public static void finishRestoring() throws DdlException {
RestoreClusterSnapshotMgr self = instance;
if (self == null) {
return;
}

try {
List<ClusterSnapshotConfig.Frontend> frontends = self.config.getFrontends();
if (frontends != null) {
NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr();
// Drop old frontends
for (Frontend frontend : nodeMgr.getOtherFrontends()) {
LOG.info("Drop old frontend {}", frontend);
nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort());
}

// Add new frontends
for (ClusterSnapshotConfig.Frontend frontend : frontends) {
LOG.info("Add new frontend {}", frontend);
nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER,
frontend.getHost(), frontend.getEditLogPort());
}
}

List<ClusterSnapshotConfig.ComputeNode> computeNodes = self.config.getComputeNodes();
if (computeNodes != null) {
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
for (Backend be : systemInfoService.getIdToBackend().values()) {
LOG.info("Drop old backend {}", be);
systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, false);
}

// Drop old compute nodes
for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) {
LOG.info("Drop old compute node {}", cn);
systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}

// Add new compute nodes
for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) {
LOG.info("Add new compute node {}", cn);
systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}
}
self.updateFrontends();

// TODO: Update storage volume
self.updateComputeNodes();

self.updateStorageVolumes();
} finally {
// Rollback config
Config.start_with_incomplete_meta = self.oldStartWithIncompleteMeta;
Expand All @@ -121,4 +93,67 @@ public static void finishRestoring() throws DdlException {
instance = null;
}
}

private void updateFrontends() throws DdlException {
List<ClusterSnapshotConfig.Frontend> frontends = config.getFrontends();
if (frontends == null) {
return;
}

NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr();
// Drop old frontends
for (Frontend frontend : nodeMgr.getOtherFrontends()) {
LOG.info("Drop old frontend {}", frontend);
nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort());
}

// Add new frontends
for (ClusterSnapshotConfig.Frontend frontend : frontends) {
LOG.info("Add new frontend {}", frontend);
nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER,
frontend.getHost(), frontend.getEditLogPort());
}
}

private void updateComputeNodes() throws DdlException {
List<ClusterSnapshotConfig.ComputeNode> computeNodes = config.getComputeNodes();
if (computeNodes == null) {
return;
}

SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
for (Backend be : systemInfoService.getIdToBackend().values()) {
LOG.info("Drop old backend {}", be);
systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, false);
}

// Drop old compute nodes
for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) {
LOG.info("Drop old compute node {}", cn);
systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}

// Add new compute nodes
for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) {
LOG.info("Add new compute node {}", cn);
systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}
}

private void updateStorageVolumes() throws DdlException {
List<ClusterSnapshotConfig.StorageVolume> storageVolumes = config.getStorageVolumes();
if (storageVolumes == null) {
return;
}

StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr();
for (ClusterSnapshotConfig.StorageVolume storageVolume : storageVolumes) {
storageVolumeMgr.updateStorageVolume(storageVolume.getName(), storageVolume.getType(),
Collections.singletonList(storageVolume.getLocation()), storageVolume.getProperties(),
storageVolume.getComment());
}
}
}
39 changes: 26 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/server/StorageVolumeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.server;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import com.staros.util.LockCloseable;
Expand Down Expand Up @@ -152,27 +153,31 @@ public void removeStorageVolume(String name) throws DdlException, MetaNotFoundEx
}

public void updateStorageVolume(AlterStorageVolumeStmt stmt) throws DdlException {
updateStorageVolume(stmt.getName(), null, null, stmt.getProperties(), stmt.getComment());
}

public void updateStorageVolume(String name, String svType, List<String> locations,
Map<String, String> properties, String comment) throws DdlException {
Map<String, String> params = new HashMap<>();
Optional<Boolean> enabled = parseProperties(stmt.getProperties(), params);
updateStorageVolume(stmt.getName(), params, enabled, stmt.getComment());
Optional<Boolean> enabled = parseProperties(properties, params);
updateStorageVolume(name, svType, locations, params, enabled, comment);
}

public void updateStorageVolume(String name, Map<String, String> params, Optional<Boolean> enabled, String comment)
throws DdlException {
public void updateStorageVolume(String name, String svType, List<String> locations,
Map<String, String> params, Optional<Boolean> enabled, String comment) throws DdlException {
List<String> immutableProperties = Lists.newArrayList(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX,
CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX);
for (String param : immutableProperties) {
if (params.containsKey(param)) {
throw new DdlException(String.format("Storage volume property '%s' is immutable!", param));
}
}
try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) {
StorageVolume sv = getStorageVolumeByName(name);
Preconditions.checkState(sv != null, "Storage volume '%s' does not exist", name);
StorageVolume copied = new StorageVolume(sv);
validateParams(copied.getType(), params);

List<String> immutableProperties =
Lists.newArrayList(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX,
CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX);
for (String param : immutableProperties) {
if (params.containsKey(param)) {
throw new DdlException(String.format("Storage volume property '%s' is immutable!", param));
}
}
if (enabled.isPresent()) {
boolean enabledValue = enabled.get();
if (!enabledValue) {
Expand All @@ -182,7 +187,15 @@ public void updateStorageVolume(String name, Map<String, String> params, Optiona
copied.setEnabled(enabledValue);
}

if (!comment.isEmpty()) {
if (!Strings.isNullOrEmpty(svType)) {
copied.setType(svType);
}

if (locations != null) {
copied.setLocations(locations);
}

if (!Strings.isNullOrEmpty(comment)) {
copied.setComment(comment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,19 @@ public void setComment(String comment) {
public String getComment() {
return comment;
}

public void setLocations(List<String> locations) {
this.locations = locations;
}

public List<String> getLocations() {
return locations;
}

public void setType(String type) {
this.svt = toStorageVolumeType(type);
}

public String getType() {
return svt.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testLoadFromFile() {

ClusterSnapshotConfig.StorageVolume storageVolume1 = config.getStorageVolumes().get(0);
Assert.assertEquals("my_s3_volume", storageVolume1.getName());
Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.S3, storageVolume1.getType());
Assert.assertEquals("S3", storageVolume1.getType());
Assert.assertEquals("s3://defaultbucket/test/", storageVolume1.getLocation());
Assert.assertEquals("my s3 volume", storageVolume1.getComment());
Assert.assertEquals(4, storageVolume1.getProperties().size());
Expand All @@ -74,7 +74,7 @@ public void testLoadFromFile() {

ClusterSnapshotConfig.StorageVolume storageVolume2 = config.getStorageVolumes().get(1);
Assert.assertEquals("my_hdfs_volume", storageVolume2.getName());
Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.HDFS, storageVolume2.getType());
Assert.assertEquals("HDFS", storageVolume2.getType());
Assert.assertEquals("hdfs://127.0.0.1:9000/sr/test/", storageVolume2.getLocation());
Assert.assertEquals("my hdfs volume", storageVolume2.getComment());
Assert.assertEquals(2, storageVolume2.getProperties().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

package com.starrocks.lake.snapshot;

import com.starrocks.server.GlobalStateMgr;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.Optional;

public class RestoreClusterSnapshotMgrTest {
@BeforeClass
public static void beforeClass() throws Exception {
Expand All @@ -31,6 +35,12 @@ public void testRestoreClusterSnapshotMgr() throws Exception {
new String[] { "-cluster_snapshot" });
Assert.assertTrue(RestoreClusterSnapshotMgr.isRestoring());

for (ClusterSnapshotConfig.StorageVolume sv : RestoreClusterSnapshotMgr.getConfig().getStorageVolumes()) {
GlobalStateMgr.getCurrentState().getStorageVolumeMgr().createStorageVolume(sv.getName(), sv.getType(),
Collections.singletonList(sv.getLocation()), sv.getProperties(), Optional.of(true),
sv.getComment());
}

RestoreClusterSnapshotMgr.finishRestoring();
Assert.assertFalse(RestoreClusterSnapshotMgr.isRestoring());
}
Expand Down
Loading

0 comments on commit 2ad831b

Please sign in to comment.