diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotConfig.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotConfig.java index b068965d64592..01b27425a4196 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotConfig.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotConfig.java @@ -133,22 +133,6 @@ public String toString() { } public static class StorageVolume { - public static enum StorageVolumeType { - S3, - HDFS, - AZBLOB; - - @JsonCreator - public static StorageVolumeType forValue(String value) { - return StorageVolumeType.valueOf(value.toUpperCase()); - } - - @JsonValue - public String toValue() { - return name().toLowerCase(); - } - } - private static class PropertiesDeserializer extends JsonDeserializer> { @Override @@ -178,7 +162,7 @@ public Map deserialize(JsonParser parser, DeserializationContext private String name; @JsonProperty("type") - private StorageVolumeType type; + private String type; @JsonProperty("location") private String location; @@ -198,11 +182,11 @@ public void setName(String name) { this.name = name; } - public StorageVolumeType getType() { + public String getType() { return type; } - public void setType(StorageVolumeType type) { + public void setType(String type) { this.type = type; } diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java index 481bd2818b505..7e187329dc760 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java @@ -19,6 +19,7 @@ import com.starrocks.ha.FrontendNodeType; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.NodeMgr; +import com.starrocks.server.StorageVolumeMgr; import com.starrocks.server.WarehouseManager; import com.starrocks.system.Backend; import com.starrocks.system.ComputeNode; @@ -27,6 +28,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.List; public class RestoreClusterSnapshotMgr { @@ -63,6 +65,14 @@ public static boolean isRestoring() { return instance != null; } + public static ClusterSnapshotConfig getConfig() { + RestoreClusterSnapshotMgr self = instance; + if (self == null) { + return null; + } + return self.config; + } + public static void finishRestoring() throws DdlException { RestoreClusterSnapshotMgr self = instance; if (self == null) { @@ -70,49 +80,11 @@ public static void finishRestoring() throws DdlException { } try { - List frontends = self.config.getFrontends(); - if (frontends != null) { - NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); - // Drop old frontends - for (Frontend frontend : nodeMgr.getOtherFrontends()) { - LOG.info("Drop old frontend {}", frontend); - nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort()); - } - - // Add new frontends - for (ClusterSnapshotConfig.Frontend frontend : frontends) { - LOG.info("Add new frontend {}", frontend); - nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER, - frontend.getHost(), frontend.getEditLogPort()); - } - } - - List computeNodes = self.config.getComputeNodes(); - if (computeNodes != null) { - SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); - for (Backend be : systemInfoService.getIdToBackend().values()) { - LOG.info("Drop old backend {}", be); - systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(), - WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); - } - - // Drop old compute nodes - for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) { - LOG.info("Drop old compute node {}", cn); - systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(), - WarehouseManager.DEFAULT_WAREHOUSE_NAME); - } - - // Add new compute nodes - for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) { - LOG.info("Add new compute node {}", cn); - systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(), - WarehouseManager.DEFAULT_WAREHOUSE_NAME); - } - } + self.updateFrontends(); - // TODO: Update storage volume + self.updateComputeNodes(); + self.updateStorageVolumes(); } finally { // Rollback config Config.start_with_incomplete_meta = self.oldStartWithIncompleteMeta; @@ -121,4 +93,67 @@ public static void finishRestoring() throws DdlException { instance = null; } } + + private void updateFrontends() throws DdlException { + List frontends = config.getFrontends(); + if (frontends == null) { + return; + } + + NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); + // Drop old frontends + for (Frontend frontend : nodeMgr.getOtherFrontends()) { + LOG.info("Drop old frontend {}", frontend); + nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort()); + } + + // Add new frontends + for (ClusterSnapshotConfig.Frontend frontend : frontends) { + LOG.info("Add new frontend {}", frontend); + nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER, + frontend.getHost(), frontend.getEditLogPort()); + } + } + + private void updateComputeNodes() throws DdlException { + List computeNodes = config.getComputeNodes(); + if (computeNodes == null) { + return; + } + + SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + for (Backend be : systemInfoService.getIdToBackend().values()) { + LOG.info("Drop old backend {}", be); + systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); + } + + // Drop old compute nodes + for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) { + LOG.info("Drop old compute node {}", cn); + systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + + // Add new compute nodes + for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) { + LOG.info("Add new compute node {}", cn); + systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + } + + private void updateStorageVolumes() throws DdlException { + List storageVolumes = config.getStorageVolumes(); + if (storageVolumes == null) { + return; + } + + StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr(); + for (ClusterSnapshotConfig.StorageVolume storageVolume : storageVolumes) { + storageVolumeMgr.updateStorageVolume(storageVolume.getName(), storageVolume.getType(), + Collections.singletonList(storageVolume.getLocation()), storageVolume.getProperties(), + storageVolume.getComment()); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/StorageVolumeMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/StorageVolumeMgr.java index 633fa0334a5c9..295fb36afeaa8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/StorageVolumeMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/StorageVolumeMgr.java @@ -15,6 +15,7 @@ package com.starrocks.server; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import com.staros.util.LockCloseable; @@ -152,27 +153,31 @@ public void removeStorageVolume(String name) throws DdlException, MetaNotFoundEx } public void updateStorageVolume(AlterStorageVolumeStmt stmt) throws DdlException { + updateStorageVolume(stmt.getName(), null, null, stmt.getProperties(), stmt.getComment()); + } + + public void updateStorageVolume(String name, String svType, List locations, + Map properties, String comment) throws DdlException { Map params = new HashMap<>(); - Optional enabled = parseProperties(stmt.getProperties(), params); - updateStorageVolume(stmt.getName(), params, enabled, stmt.getComment()); + Optional enabled = parseProperties(properties, params); + updateStorageVolume(name, svType, locations, params, enabled, comment); } - public void updateStorageVolume(String name, Map params, Optional enabled, String comment) - throws DdlException { + public void updateStorageVolume(String name, String svType, List locations, + Map params, Optional enabled, String comment) throws DdlException { + List immutableProperties = Lists.newArrayList(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX, + CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX); + for (String param : immutableProperties) { + if (params.containsKey(param)) { + throw new DdlException(String.format("Storage volume property '%s' is immutable!", param)); + } + } try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) { StorageVolume sv = getStorageVolumeByName(name); Preconditions.checkState(sv != null, "Storage volume '%s' does not exist", name); StorageVolume copied = new StorageVolume(sv); validateParams(copied.getType(), params); - List immutableProperties = - Lists.newArrayList(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX, - CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX); - for (String param : immutableProperties) { - if (params.containsKey(param)) { - throw new DdlException(String.format("Storage volume property '%s' is immutable!", param)); - } - } if (enabled.isPresent()) { boolean enabledValue = enabled.get(); if (!enabledValue) { @@ -182,7 +187,15 @@ public void updateStorageVolume(String name, Map params, Optiona copied.setEnabled(enabledValue); } - if (!comment.isEmpty()) { + if (!Strings.isNullOrEmpty(svType)) { + copied.setType(svType); + } + + if (locations != null) { + copied.setLocations(locations); + } + + if (!Strings.isNullOrEmpty(comment)) { copied.setComment(comment); } diff --git a/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java b/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java index 570066c918d7f..d8ae708c4bdb7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java +++ b/fe/fe-core/src/main/java/com/starrocks/storagevolume/StorageVolume.java @@ -174,10 +174,19 @@ public void setComment(String comment) { public String getComment() { return comment; } + + public void setLocations(List locations) { + this.locations = locations; + } + public List getLocations() { return locations; } + public void setType(String type) { + this.svt = toStorageVolumeType(type); + } + public String getType() { return svt.toString(); } diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java index eb734f095a030..154ad8131637d 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java @@ -62,7 +62,7 @@ public void testLoadFromFile() { ClusterSnapshotConfig.StorageVolume storageVolume1 = config.getStorageVolumes().get(0); Assert.assertEquals("my_s3_volume", storageVolume1.getName()); - Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.S3, storageVolume1.getType()); + Assert.assertEquals("S3", storageVolume1.getType()); Assert.assertEquals("s3://defaultbucket/test/", storageVolume1.getLocation()); Assert.assertEquals("my s3 volume", storageVolume1.getComment()); Assert.assertEquals(4, storageVolume1.getProperties().size()); @@ -74,7 +74,7 @@ public void testLoadFromFile() { ClusterSnapshotConfig.StorageVolume storageVolume2 = config.getStorageVolumes().get(1); Assert.assertEquals("my_hdfs_volume", storageVolume2.getName()); - Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.HDFS, storageVolume2.getType()); + Assert.assertEquals("HDFS", storageVolume2.getType()); Assert.assertEquals("hdfs://127.0.0.1:9000/sr/test/", storageVolume2.getLocation()); Assert.assertEquals("my hdfs volume", storageVolume2.getComment()); Assert.assertEquals(2, storageVolume2.getProperties().size()); diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java index 9c27f800a6985..6231c73121389 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java @@ -14,11 +14,15 @@ package com.starrocks.lake.snapshot; +import com.starrocks.server.GlobalStateMgr; import com.starrocks.utframe.UtFrameUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.util.Collections; +import java.util.Optional; + public class RestoreClusterSnapshotMgrTest { @BeforeClass public static void beforeClass() throws Exception { @@ -31,6 +35,12 @@ public void testRestoreClusterSnapshotMgr() throws Exception { new String[] { "-cluster_snapshot" }); Assert.assertTrue(RestoreClusterSnapshotMgr.isRestoring()); + for (ClusterSnapshotConfig.StorageVolume sv : RestoreClusterSnapshotMgr.getConfig().getStorageVolumes()) { + GlobalStateMgr.getCurrentState().getStorageVolumeMgr().createStorageVolume(sv.getName(), sv.getType(), + Collections.singletonList(sv.getLocation()), sv.getProperties(), Optional.of(true), + sv.getComment()); + } + RestoreClusterSnapshotMgr.finishRestoring(); Assert.assertFalse(RestoreClusterSnapshotMgr.isRestoring()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/server/SharedDataStorageVolumeMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/server/SharedDataStorageVolumeMgrTest.java index e300f7f75cc4c..022a5f89f19b2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/server/SharedDataStorageVolumeMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/server/SharedDataStorageVolumeMgrTest.java @@ -216,16 +216,16 @@ public void testStorageVolumeCRUD() throws AlreadyExistsException, DdlException, storageParams.put(AWS_S3_ACCESS_KEY, "ak"); storageParams.put(AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR, "true"); try { - svm.updateStorageVolume(svName1, storageParams, Optional.of(false), "test update"); + svm.updateStorageVolume(svName1, null, null, storageParams, Optional.of(false), "test update"); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("Storage volume 'test1' does not exist")); } storageParams.put("aaa", "bbb"); Assert.assertThrows(DdlException.class, () -> - svm.updateStorageVolume(svName, storageParams, Optional.of(true), "test update")); + svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(true), "test update")); storageParams.remove("aaa"); - svm.updateStorageVolume(svName, storageParams, Optional.of(true), "test update"); + svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(true), "test update"); sv = svm.getStorageVolumeByName(svName); cloudConfiguration = sv.getCloudConfiguration(); Assert.assertEquals("region1", ((AwsCloudConfiguration) cloudConfiguration).getAwsCloudCredential() @@ -246,7 +246,7 @@ public void testStorageVolumeCRUD() throws AlreadyExistsException, DdlException, Assert.assertEquals(sv.getId(), svm.getDefaultStorageVolumeId()); Throwable ex = Assert.assertThrows(IllegalStateException.class, - () -> svm.updateStorageVolume(svName, storageParams, Optional.of(false), "")); + () -> svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(false), "")); Assert.assertEquals("Default volume can not be disabled", ex.getMessage()); // remove @@ -261,7 +261,7 @@ public void testStorageVolumeCRUD() throws AlreadyExistsException, DdlException, Assert.assertEquals("Storage volume 'test1' does not exist", ex.getMessage()); svm.createStorageVolume(svName1, "S3", locations, storageParams, Optional.of(false), ""); - svm.updateStorageVolume(svName1, storageParams, Optional.of(true), "test update"); + svm.updateStorageVolume(svName1, null, null, storageParams, Optional.of(true), "test update"); svm.setDefaultStorageVolume(svName1); sv = svm.getStorageVolumeByName(svName); @@ -285,14 +285,14 @@ public void testImmutableProperties() throws DdlException, AlreadyExistsExceptio Map modifyParams = new HashMap<>(); modifyParams.put(CloudConfigurationConstants.AWS_S3_ENABLE_PARTITIONED_PREFIX, "true"); Assert.assertThrows(DdlException.class, () -> - svm.updateStorageVolume(svName, modifyParams, Optional.of(false), "")); + svm.updateStorageVolume(svName, null, null, modifyParams, Optional.of(false), "")); } { Map modifyParams = new HashMap<>(); modifyParams.put(CloudConfigurationConstants.AWS_S3_NUM_PARTITIONED_PREFIX, "12"); Assert.assertThrows(DdlException.class, () -> - svm.updateStorageVolume(svName, modifyParams, Optional.of(false), "")); + svm.updateStorageVolume(svName, null, null, modifyParams, Optional.of(false), "")); } } @@ -313,7 +313,7 @@ public void testBindAndUnbind() throws DdlException, AlreadyExistsException, Met Assert.assertTrue(svm.bindDbToStorageVolume(svName, 1L)); Assert.assertTrue(svm.bindTableToStorageVolume(svName, 1L, 1L)); - svm.updateStorageVolume(svName, storageParams, Optional.of(false), "test update"); + svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(false), "test update"); // disabled storage volume can not be bound. Throwable ex = Assert.assertThrows(DdlException.class, () -> svm.bindDbToStorageVolume(svName, 1L)); Assert.assertEquals(String.format("Storage volume %s is disabled", svName), ex.getMessage()); @@ -891,7 +891,7 @@ public void testCreateHDFS() throws DdlException, AlreadyExistsException { storageParams.put("dfs.client.failover.proxy.provider", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - svm.updateStorageVolume("test", storageParams, Optional.of(false), ""); + svm.updateStorageVolume("test", null, null, storageParams, Optional.of(false), ""); Assert.assertEquals(false, svm.getStorageVolumeByName(svName).getEnabled()); } diff --git a/fe/fe-core/src/test/java/com/starrocks/server/SharedNothingStorageVolumeMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/server/SharedNothingStorageVolumeMgrTest.java index c452c4bad9c42..138637bb67603 100644 --- a/fe/fe-core/src/test/java/com/starrocks/server/SharedNothingStorageVolumeMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/server/SharedNothingStorageVolumeMgrTest.java @@ -98,16 +98,16 @@ public EditLog getEditLog() { storageParams.put(AWS_S3_ACCESS_KEY, "ak"); storageParams.put(AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR, "true"); try { - svm.updateStorageVolume(svName1, storageParams, Optional.of(false), "test update"); + svm.updateStorageVolume(svName1, null, null, storageParams, Optional.of(false), "test update"); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("Storage volume 'test1' does not exist")); } storageParams.put("aaa", "bbb"); Assert.assertThrows(DdlException.class, () -> - svm.updateStorageVolume(svName, storageParams, Optional.of(true), "test update")); + svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(true), "test update")); storageParams.remove("aaa"); - svm.updateStorageVolume(svName, storageParams, Optional.of(true), "test update"); + svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(true), "test update"); sv = svm.getStorageVolumeByName(svName); cloudConfiguration = sv.getCloudConfiguration(); Assert.assertEquals("region1", ((AwsCloudConfiguration) cloudConfiguration).getAwsCloudCredential() @@ -127,7 +127,7 @@ public EditLog getEditLog() { svm.setDefaultStorageVolume(svName); Assert.assertEquals(sv.getId(), svm.getDefaultStorageVolumeId()); try { - svm.updateStorageVolume(svName, storageParams, Optional.of(false), ""); + svm.updateStorageVolume(svName, null, null, storageParams, Optional.of(false), ""); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("Default volume can not be disabled")); @@ -148,7 +148,7 @@ public EditLog getEditLog() { Assert.assertEquals("Storage volume 'test1' does not exist", ex.getMessage()); svm.createStorageVolume(svName1, "S3", locations, storageParams, Optional.empty(), ""); - svm.updateStorageVolume(svName1, storageParams, Optional.empty(), "test update"); + svm.updateStorageVolume(svName1, null, null, storageParams, Optional.empty(), "test update"); svm.setDefaultStorageVolume(svName1); sv = svm.getStorageVolumeByName(svName);