diff --git a/bin/start_fe.sh b/bin/start_fe.sh index 72133eb1256aa..24331837c32d6 100755 --- a/bin/start_fe.sh +++ b/bin/start_fe.sh @@ -25,6 +25,7 @@ OPTS=$(getopt \ -l 'daemon' \ -l 'helper:' \ -l 'host_type:' \ + -l 'cluster_snapshot' \ -l 'debug' \ -l 'logconsole' \ -- "$@") @@ -34,6 +35,7 @@ eval set -- "$OPTS" RUN_DAEMON=0 HELPER= HOST_TYPE= +CLUSTER_SNAPSHOT= ENABLE_DEBUGGER=0 RUN_LOG_CONSOLE=${SYS_LOG_TO_CONSOLE:-0} # min jdk version required @@ -43,6 +45,7 @@ while true; do --daemon) RUN_DAEMON=1 ; shift ;; --helper) HELPER=$2 ; shift 2 ;; --host_type) HOST_TYPE=$2 ; shift 2 ;; + --cluster_snapshot) CLUSTER_SNAPSHOT="-cluster_snapshot" ; shift ;; --debug) ENABLE_DEBUGGER=1 ; shift ;; --logconsole) RUN_LOG_CONSOLE=1 ; shift ;; --) shift ; break ;; @@ -228,7 +231,7 @@ echo "start time: $(date), server uptime: $(uptime)" # StarRocksFE java process will write its process id into $pidfile if [ ${RUN_DAEMON} -eq 1 ]; then - nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" > { + + @Override + public Map deserialize(JsonParser parser, DeserializationContext context) + throws IOException, JsonProcessingException { + ObjectMapper mapper = (ObjectMapper) parser.getCodec(); + List> list = mapper.readValue(parser, + new TypeReference>>() { + }); + + Map properties = new HashMap<>(); + for (Map entry : list) { + String key = entry.get("key"); + String value = entry.get("value"); + if (key == null || key.isEmpty() || value == null || value.isEmpty()) { + throw new JsonProcessingException("Missing 'key' or 'value' in properties entry", + parser.getTokenLocation()) { + }; + } + properties.put(key, value); + } + return properties; + } + } + + @JsonProperty("name") + private String name; + + @JsonProperty("type") + private StorageVolumeType type; + + @JsonProperty("location") + private String location; + + @JsonProperty("comment") + private String comment; + + @JsonProperty("properties") + @JsonDeserialize(using = PropertiesDeserializer.class) + private Map properties; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public StorageVolumeType getType() { + return type; + } + + public void setType(StorageVolumeType type) { + this.type = type; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + } + + @JsonProperty("frontends") + private List frontends; + + @JsonProperty("compute_nodes") + private List computeNodes; + + @JsonProperty("storage_volumes") + private List storageVolumes; + + public List getFrontends() { + return frontends; + } + + public List getComputeNodes() { + return computeNodes; + } + + public List getStorageVolumes() { + return storageVolumes; + } + + public static ClusterSnapshotConfig load(String clusterSnapshotYamlFile) { + try { + ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); + ClusterSnapshotConfig config = objectMapper.readValue(new File(clusterSnapshotYamlFile), + ClusterSnapshotConfig.class); + return config; + } catch (Exception e) { + LOG.warn("Failed to load cluster snapshot config {} ", clusterSnapshotYamlFile, e); + throw new RuntimeException(e); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java new file mode 100644 index 0000000000000..481bd2818b505 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgr.java @@ -0,0 +1,124 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.starrocks.common.Config; +import com.starrocks.common.DdlException; +import com.starrocks.ha.FrontendNodeType; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.NodeMgr; +import com.starrocks.server.WarehouseManager; +import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.Frontend; +import com.starrocks.system.SystemInfoService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class RestoreClusterSnapshotMgr { + private static final Logger LOG = LogManager.getLogger(RestoreClusterSnapshotMgr.class); + + private static RestoreClusterSnapshotMgr instance; + + private ClusterSnapshotConfig config; + private boolean oldStartWithIncompleteMeta; + private boolean oldResetElectionGroup; + + private RestoreClusterSnapshotMgr(String clusterSnapshotYamlFile) { + config = ClusterSnapshotConfig.load(clusterSnapshotYamlFile); + // Save the old config + oldStartWithIncompleteMeta = Config.start_with_incomplete_meta; + // Allow starting with only image and no log + Config.start_with_incomplete_meta = true; + // Save the old config + oldResetElectionGroup = Config.bdbje_reset_election_group; + Config.bdbje_reset_election_group = true; + } + + public static void init(String clusterSnapshotYamlFile, String[] args) { + for (String arg : args) { + if (arg.equalsIgnoreCase("-cluster_snapshot")) { + LOG.info("FE start to restore from a cluster snapshot"); + instance = new RestoreClusterSnapshotMgr(clusterSnapshotYamlFile); + return; + } + } + } + + public static boolean isRestoring() { + return instance != null; + } + + public static void finishRestoring() throws DdlException { + RestoreClusterSnapshotMgr self = instance; + if (self == null) { + return; + } + + try { + List frontends = self.config.getFrontends(); + if (frontends != null) { + NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); + // Drop old frontends + for (Frontend frontend : nodeMgr.getOtherFrontends()) { + LOG.info("Drop old frontend {}", frontend); + nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort()); + } + + // Add new frontends + for (ClusterSnapshotConfig.Frontend frontend : frontends) { + LOG.info("Add new frontend {}", frontend); + nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER, + frontend.getHost(), frontend.getEditLogPort()); + } + } + + List computeNodes = self.config.getComputeNodes(); + if (computeNodes != null) { + SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + for (Backend be : systemInfoService.getIdToBackend().values()) { + LOG.info("Drop old backend {}", be); + systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); + } + + // Drop old compute nodes + for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) { + LOG.info("Drop old compute node {}", cn); + systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + + // Add new compute nodes + for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) { + LOG.info("Add new compute node {}", cn); + systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + } + + // TODO: Update storage volume + + } finally { + // Rollback config + Config.start_with_incomplete_meta = self.oldStartWithIncompleteMeta; + Config.bdbje_reset_election_group = self.oldResetElectionGroup; + + instance = null; + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index 67a4f48c56206..d5d93147614e7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -131,11 +131,6 @@ public SystemInfoService() { public void addComputeNodes(AddComputeNodeClause addComputeNodeClause) throws DdlException { - - for (Pair pair : addComputeNodeClause.getHostPortPairs()) { - checkSameNodeExist(pair.first, pair.second); - } - for (Pair pair : addComputeNodeClause.getHostPortPairs()) { addComputeNode(pair.first, pair.second, addComputeNodeClause.getWarehouse()); } @@ -169,7 +164,9 @@ public void dropComputeNode(ComputeNode computeNode) { } // Final entry of adding compute node - private void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { + public void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { + checkSameNodeExist(host, heartbeatPort); + ComputeNode newComputeNode = new ComputeNode(GlobalStateMgr.getCurrentState().getNextId(), host, heartbeatPort); idToComputeNodeRef.put(newComputeNode.getId(), newComputeNode); setComputeNodeOwner(newComputeNode); diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java new file mode 100644 index 0000000000000..eb734f095a030 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/ClusterSnapshotConfigTest.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.lake.snapshot; + +import org.junit.Assert; +import org.junit.Test; + +public class ClusterSnapshotConfigTest { + + @Test + public void testLoadFromFile() { + ClusterSnapshotConfig config = ClusterSnapshotConfig.load("src/test/resources/conf/cluster_snapshot.yaml"); + Assert.assertEquals(2, config.getFrontends().size()); + Assert.assertEquals(2, config.getComputeNodes().size()); + Assert.assertEquals(2, config.getStorageVolumes().size()); + + ClusterSnapshotConfig.Frontend frontend1 = config.getFrontends().get(0); + Assert.assertEquals("172.26.92.1", frontend1.getHost()); + Assert.assertEquals(9010, frontend1.getEditLogPort()); + Assert.assertEquals(ClusterSnapshotConfig.Frontend.FrontendType.FOLLOWER, frontend1.getType()); + Assert.assertTrue(frontend1.isFollower()); + Assert.assertFalse(frontend1.isObserver()); + + ClusterSnapshotConfig.Frontend frontend2 = config.getFrontends().get(1); + Assert.assertEquals("172.26.92.2", frontend2.getHost()); + Assert.assertEquals(9010, frontend2.getEditLogPort()); + Assert.assertEquals(ClusterSnapshotConfig.Frontend.FrontendType.OBSERVER, frontend2.getType()); + Assert.assertFalse(frontend2.isFollower()); + Assert.assertTrue(frontend2.isObserver()); + + frontend1.toString(); + frontend1.setHost(frontend2.getHost()); + frontend1.setEditLogPort(frontend2.getEditLogPort()); + frontend1.setType(frontend2.getType()); + + ClusterSnapshotConfig.ComputeNode computeNode1 = config.getComputeNodes().get(0); + Assert.assertEquals("172.26.92.11", computeNode1.getHost()); + Assert.assertEquals(9050, computeNode1.getHeartbeatServicePort()); + + ClusterSnapshotConfig.ComputeNode computeNode2 = config.getComputeNodes().get(1); + Assert.assertEquals("172.26.92.12", computeNode2.getHost()); + Assert.assertEquals(9050, computeNode2.getHeartbeatServicePort()); + + computeNode1.toString(); + computeNode1.setHost(computeNode2.getHost()); + computeNode1.setHeartbeatServicePort(computeNode2.getHeartbeatServicePort()); + + ClusterSnapshotConfig.StorageVolume storageVolume1 = config.getStorageVolumes().get(0); + Assert.assertEquals("my_s3_volume", storageVolume1.getName()); + Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.S3, storageVolume1.getType()); + Assert.assertEquals("s3://defaultbucket/test/", storageVolume1.getLocation()); + Assert.assertEquals("my s3 volume", storageVolume1.getComment()); + Assert.assertEquals(4, storageVolume1.getProperties().size()); + Assert.assertEquals("us-west-2", storageVolume1.getProperties().get("aws.s3.region")); + Assert.assertEquals("https://s3.us-west-2.amazonaws.com", + storageVolume1.getProperties().get("aws.s3.endpoint")); + Assert.assertEquals("xxxxxxxxxx", storageVolume1.getProperties().get("aws.s3.access_key")); + Assert.assertEquals("yyyyyyyyyy", storageVolume1.getProperties().get("aws.s3.secret_key")); + + ClusterSnapshotConfig.StorageVolume storageVolume2 = config.getStorageVolumes().get(1); + Assert.assertEquals("my_hdfs_volume", storageVolume2.getName()); + Assert.assertEquals(ClusterSnapshotConfig.StorageVolume.StorageVolumeType.HDFS, storageVolume2.getType()); + Assert.assertEquals("hdfs://127.0.0.1:9000/sr/test/", storageVolume2.getLocation()); + Assert.assertEquals("my hdfs volume", storageVolume2.getComment()); + Assert.assertEquals(2, storageVolume2.getProperties().size()); + Assert.assertEquals("simple", storageVolume2.getProperties().get("hadoop.security.authentication")); + Assert.assertEquals("starrocks", storageVolume2.getProperties().get("username")); + + storageVolume1.setName(storageVolume2.getName()); + storageVolume1.setType(storageVolume2.getType()); + storageVolume1.setLocation(storageVolume2.getLocation()); + storageVolume1.setComment(storageVolume2.getComment()); + storageVolume1.setProperties(storageVolume2.getProperties()); + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java new file mode 100644 index 0000000000000..9c27f800a6985 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/lake/snapshot/RestoreClusterSnapshotMgrTest.java @@ -0,0 +1,37 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.starrocks.utframe.UtFrameUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class RestoreClusterSnapshotMgrTest { + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinStarRocksCluster(); + } + + @Test + public void testRestoreClusterSnapshotMgr() throws Exception { + RestoreClusterSnapshotMgr.init("src/test/resources/conf/cluster_snapshot.yaml", + new String[] { "-cluster_snapshot" }); + Assert.assertTrue(RestoreClusterSnapshotMgr.isRestoring()); + + RestoreClusterSnapshotMgr.finishRestoring(); + Assert.assertFalse(RestoreClusterSnapshotMgr.isRestoring()); + } +} diff --git a/fe/fe-core/src/test/resources/conf/cluster_snapshot.yaml b/fe/fe-core/src/test/resources/conf/cluster_snapshot.yaml new file mode 100644 index 0000000000000..76bb62c37759d --- /dev/null +++ b/fe/fe-core/src/test/resources/conf/cluster_snapshot.yaml @@ -0,0 +1,39 @@ +# do not include leader fe +frontends: + - host: 172.26.92.1 + edit_log_port: 9010 + type: follower #default follower + - host: 172.26.92.2 + edit_log_port: 9010 + type: observer + +compute_nodes: + - host: 172.26.92.11 + heartbeat_service_port: 9050 + - host: 172.26.92.12 + heartbeat_service_port: 9050 + +# used for restoring a cloned snapshot +storage_volumes: + - name: my_s3_volume + type: S3 + location: s3://defaultbucket/test/ + comment: my s3 volume + properties: + - key: aws.s3.region + value: us-west-2 + - key: aws.s3.endpoint + value: https://s3.us-west-2.amazonaws.com + - key: aws.s3.access_key + value: xxxxxxxxxx + - key: aws.s3.secret_key + value: yyyyyyyyyy + - name: my_hdfs_volume + type: HDFS + location: hdfs://127.0.0.1:9000/sr/test/ + comment: my hdfs volume + properties: + - key: hadoop.security.authentication + value: simple + - key: username + value: starrocks