Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support restoring from a cluster snapshot for shared-data mode (part 2, update metadata during restoring) #54128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,6 @@ public String toString() {
}

public static class StorageVolume {
public static enum StorageVolumeType {
S3,
HDFS,
AZBLOB;

@JsonCreator
public static StorageVolumeType forValue(String value) {
return StorageVolumeType.valueOf(value.toUpperCase());
}

@JsonValue
public String toValue() {
return name().toLowerCase();
}
}

private static class PropertiesDeserializer extends JsonDeserializer<Map<String, String>> {

@Override
Expand Down Expand Up @@ -178,7 +162,7 @@ public Map<String, String> deserialize(JsonParser parser, DeserializationContext
private String name;

@JsonProperty("type")
private StorageVolumeType type;
private String type;

@JsonProperty("location")
private String location;
Expand All @@ -198,11 +182,11 @@ public void setName(String name) {
this.name = name;
}

public StorageVolumeType getType() {
public String getType() {
return type;
}

public void setType(StorageVolumeType type) {
public void setType(String type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.NodeMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
Expand All @@ -27,6 +28,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;

public class RestoreClusterSnapshotMgr {
Expand Down Expand Up @@ -63,56 +65,26 @@ public static boolean isRestoring() {
return instance != null;
}

public static ClusterSnapshotConfig getConfig() {
RestoreClusterSnapshotMgr self = instance;
if (self == null) {
return null;
}
return self.config;
}

public static void finishRestoring() throws DdlException {
RestoreClusterSnapshotMgr self = instance;
if (self == null) {
return;
}

try {
List<ClusterSnapshotConfig.Frontend> frontends = self.config.getFrontends();
if (frontends != null) {
NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr();
// Drop old frontends
for (Frontend frontend : nodeMgr.getOtherFrontends()) {
LOG.info("Drop old frontend {}", frontend);
nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort());
}

// Add new frontends
for (ClusterSnapshotConfig.Frontend frontend : frontends) {
LOG.info("Add new frontend {}", frontend);
nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER,
frontend.getHost(), frontend.getEditLogPort());
}
}

List<ClusterSnapshotConfig.ComputeNode> computeNodes = self.config.getComputeNodes();
if (computeNodes != null) {
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
for (Backend be : systemInfoService.getIdToBackend().values()) {
LOG.info("Drop old backend {}", be);
systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, false);
}

// Drop old compute nodes
for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) {
LOG.info("Drop old compute node {}", cn);
systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}

// Add new compute nodes
for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) {
LOG.info("Add new compute node {}", cn);
systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}
}
self.updateFrontends();

// TODO: Update storage volume
self.updateComputeNodes();

self.updateStorageVolumes();
} finally {
// Rollback config
Config.start_with_incomplete_meta = self.oldStartWithIncompleteMeta;
Expand All @@ -121,4 +93,67 @@ public static void finishRestoring() throws DdlException {
instance = null;
}
}

private void updateFrontends() throws DdlException {
List<ClusterSnapshotConfig.Frontend> frontends = config.getFrontends();
if (frontends == null) {
return;
}

NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr();
// Drop old frontends
for (Frontend frontend : nodeMgr.getOtherFrontends()) {
LOG.info("Drop old frontend {}", frontend);
nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort());
}

// Add new frontends
for (ClusterSnapshotConfig.Frontend frontend : frontends) {
LOG.info("Add new frontend {}", frontend);
nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER,
frontend.getHost(), frontend.getEditLogPort());
}
}

private void updateComputeNodes() throws DdlException {
List<ClusterSnapshotConfig.ComputeNode> computeNodes = config.getComputeNodes();
if (computeNodes == null) {
return;
}

SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
for (Backend be : systemInfoService.getIdToBackend().values()) {
LOG.info("Drop old backend {}", be);
systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, false);
}

// Drop old compute nodes
for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) {
LOG.info("Drop old compute node {}", cn);
systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}

// Add new compute nodes
for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) {
LOG.info("Add new compute node {}", cn);
systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME);
}
}

private void updateStorageVolumes() throws DdlException {
List<ClusterSnapshotConfig.StorageVolume> storageVolumes = config.getStorageVolumes();
if (storageVolumes == null) {
return;
}

StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr();
for (ClusterSnapshotConfig.StorageVolume storageVolume : storageVolumes) {
storageVolumeMgr.updateStorageVolume(storageVolume.getName(), storageVolume.getType(),
Collections.singletonList(storageVolume.getLocation()), storageVolume.getProperties(),
storageVolume.getComment());
}
}
}
39 changes: 26 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/server/StorageVolumeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.server;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import com.staros.util.LockCloseable;
Expand Down Expand Up @@ -152,27 +153,31 @@ public void removeStorageVolume(String name) throws DdlException, MetaNotFoundEx
}

public void updateStorageVolume(AlterStorageVolumeStmt stmt) throws DdlException {
updateStorageVolume(stmt.getName(), null, null, stmt.getProperties(), stmt.getComment());
}

public void updateStorageVolume(String name, String svType, List<String> locations,
Map<String, String> properties, String comment) throws DdlException {
Map<String, String> params = new HashMap<>();
Optional<Boolean> enabled = parseProperties(stmt.getProperties(), params);
updateStorageVolume(stmt.getName(), params, enabled, stmt.getComment());
Optional<Boolean> enabled = parseProperties(properties, params);
updateStorageVolume(name, svType, locations, params, enabled, comment);
}

public void updateStorageVolume(String name, Map<String, String> params, Optional<Boolean> enabled, String comment)
throws DdlException {
public void updateStorageVolume(String name, String svType, List<String> locations,
Map<String, String> params, Optional<Boolean> enabled, String comment) throws DdlException {
List<String> immutableProperties = Lists.newArrayList(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX,
CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX);
for (String param : immutableProperties) {
if (params.containsKey(param)) {
throw new DdlException(String.format("Storage volume property '%s' is immutable!", param));
}
}
try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) {
StorageVolume sv = getStorageVolumeByName(name);
Preconditions.checkState(sv != null, "Storage volume '%s' does not exist", name);
StorageVolume copied = new StorageVolume(sv);
validateParams(copied.getType(), params);

List<String> immutableProperties =
Lists.newArrayList(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX,
CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX);
for (String param : immutableProperties) {
if (params.containsKey(param)) {
throw new DdlException(String.format("Storage volume property '%s' is immutable!", param));
}
}
if (enabled.isPresent()) {
boolean enabledValue = enabled.get();
if (!enabledValue) {
Expand All @@ -182,7 +187,15 @@ public void updateStorageVolume(String name, Map<String, String> params, Optiona
copied.setEnabled(enabledValue);
}

if (!comment.isEmpty()) {
if (!Strings.isNullOrEmpty(svType)) {
copied.setType(svType);
}

if (locations != null) {
copied.setLocations(locations);
}

if (!Strings.isNullOrEmpty(comment)) {
copied.setComment(comment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,19 @@ public void setComment(String comment) {
public String getComment() {
return comment;
}

public void setLocations(List<String> locations) {
this.locations = locations;
}

public List<String> getLocations() {
return locations;
}

public void setType(String type) {
this.svt = toStorageVolumeType(type);
}

public String getType() {
return svt.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testLoadFromFile() {

ClusterSnapshotConfig.StorageVolume storageVolume1 = config.getStorageVolumes().get(0);
Assert.assertEquals("my_s3_volume", storageVolume1.getName());
Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.S3, storageVolume1.getType());
Assert.assertEquals("S3", storageVolume1.getType());
Assert.assertEquals("s3://defaultbucket/test/", storageVolume1.getLocation());
Assert.assertEquals("my s3 volume", storageVolume1.getComment());
Assert.assertEquals(4, storageVolume1.getProperties().size());
Expand All @@ -74,7 +74,7 @@ public void testLoadFromFile() {

ClusterSnapshotConfig.StorageVolume storageVolume2 = config.getStorageVolumes().get(1);
Assert.assertEquals("my_hdfs_volume", storageVolume2.getName());
Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.HDFS, storageVolume2.getType());
Assert.assertEquals("HDFS", storageVolume2.getType());
Assert.assertEquals("hdfs://127.0.0.1:9000/sr/test/", storageVolume2.getLocation());
Assert.assertEquals("my hdfs volume", storageVolume2.getComment());
Assert.assertEquals(2, storageVolume2.getProperties().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

package com.starrocks.lake.snapshot;

import com.starrocks.server.GlobalStateMgr;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.Optional;

public class RestoreClusterSnapshotMgrTest {
@BeforeClass
public static void beforeClass() throws Exception {
Expand All @@ -31,6 +35,12 @@ public void testRestoreClusterSnapshotMgr() throws Exception {
new String[] { "-cluster_snapshot" });
Assert.assertTrue(RestoreClusterSnapshotMgr.isRestoring());

for (ClusterSnapshotConfig.StorageVolume sv : RestoreClusterSnapshotMgr.getConfig().getStorageVolumes()) {
GlobalStateMgr.getCurrentState().getStorageVolumeMgr().createStorageVolume(sv.getName(), sv.getType(),
Collections.singletonList(sv.getLocation()), sv.getProperties(), Optional.of(true),
sv.getComment());
}

RestoreClusterSnapshotMgr.finishRestoring();
Assert.assertFalse(RestoreClusterSnapshotMgr.isRestoring());
}
Expand Down
Loading
Loading