From cc323bae08dfda35f3f50892d6e25aef018fedc7 Mon Sep 17 00:00:00 2001 From: xiangguangyxg Date: Thu, 12 Dec 2024 14:33:57 +0800 Subject: [PATCH] [Feature] Support restoring from a cluster snapshot for shared-data (part 1) Signed-off-by: xiangguangyxg --- bin/start_fe.sh | 7 +- build.sh | 1 + conf/restore_snapshot.yaml | 37 +++ .../main/java/com/starrocks/StarRocksFE.java | 6 + .../common/RestoreSnapshotConfig.java | 273 ++++++++++++++++++ .../starrocks/server/RestoreSnapshotMgr.java | 122 ++++++++ .../starrocks/system/SystemInfoService.java | 9 +- .../common/RestoreSnapshotConfigTest.java | 33 +++ .../test/resources/conf/restore_snapshot.yaml | 37 +++ 9 files changed, 517 insertions(+), 8 deletions(-) create mode 100644 conf/restore_snapshot.yaml create mode 100644 fe/fe-core/src/main/java/com/starrocks/common/RestoreSnapshotConfig.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java create mode 100644 fe/fe-core/src/test/resources/conf/restore_snapshot.yaml diff --git a/bin/start_fe.sh b/bin/start_fe.sh index 72133eb1256aa4..bd94d5121717ec 100755 --- a/bin/start_fe.sh +++ b/bin/start_fe.sh @@ -25,6 +25,7 @@ OPTS=$(getopt \ -l 'daemon' \ -l 'helper:' \ -l 'host_type:' \ + -l 'restore_snapshot' \ -l 'debug' \ -l 'logconsole' \ -- "$@") @@ -34,6 +35,7 @@ eval set -- "$OPTS" RUN_DAEMON=0 HELPER= HOST_TYPE= +RESTORE_SNAPSHOT= ENABLE_DEBUGGER=0 RUN_LOG_CONSOLE=${SYS_LOG_TO_CONSOLE:-0} # min jdk version required @@ -43,6 +45,7 @@ while true; do --daemon) RUN_DAEMON=1 ; shift ;; --helper) HELPER=$2 ; shift 2 ;; --host_type) HOST_TYPE=$2 ; shift 2 ;; + --restore_snapshot) RESTORE_SNAPSHOT="-restore_snapshot" ; shift ;; --debug) ENABLE_DEBUGGER=1 ; shift ;; --logconsole) RUN_LOG_CONSOLE=1 ; shift ;; --) shift ; break ;; @@ -228,7 +231,7 @@ echo "start time: $(date), server uptime: $(uptime)" # StarRocksFE java process will write its process id into $pidfile if [ ${RUN_DAEMON} -eq 1 ]; then - nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" > { + + @Override + public Map deserialize(JsonParser parser, DeserializationContext context) + throws IOException, JsonProcessingException { + ObjectMapper mapper = (ObjectMapper) parser.getCodec(); + List> list = mapper.readValue(parser, + new TypeReference>>() { + }); + + Map properties = new HashMap<>(); + for (Map entry : list) { + String key = entry.get("key"); + String value = entry.get("value"); + if (key == null || key.isEmpty() || value == null || value.isEmpty()) { + throw new JsonProcessingException("Missing 'key' or 'value' in properties entry", + parser.getTokenLocation()) { + }; + } + properties.put(key, value); + } + return properties; + } + } + + @JsonProperty("name") + private String name; + + @JsonProperty("type") + private StorageVolumeType type; + + @JsonProperty("location") + private String location; + + @JsonProperty("comment") + private String comment; + + @JsonProperty("properties") + @JsonDeserialize(using = PropertiesDeserializer.class) + private Map properties; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public StorageVolumeType getType() { + return type; + } + + public void setType(StorageVolumeType type) { + this.type = type; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + } + + @JsonProperty("reset_election_group") + private boolean resetElectionGroup; + + @JsonProperty("frontends") + private List frontends; + + @JsonProperty("compute_nodes") + private List computeNodes; + + @JsonProperty("storage_volumes") + private List storageVolumes; + + public boolean isResetElectionGroup() { + return resetElectionGroup; + } + + public List getFrontends() { + return frontends; + } + + public List getComputeNodes() { + return computeNodes; + } + + public List getStorageVolumes() { + return storageVolumes; + } + + public static RestoreSnapshotConfig load(String restoreSnapshotYamlFile) { + try { + ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); + RestoreSnapshotConfig config = objectMapper.readValue(new File(restoreSnapshotYamlFile), + RestoreSnapshotConfig.class); + return config; + } catch (Exception e) { + LOG.warn("Failed to load restore snapshot config {} ", restoreSnapshotYamlFile, e); + throw new RuntimeException(e); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java new file mode 100644 index 00000000000000..3fe51bf3b98d9c --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/server/RestoreSnapshotMgr.java @@ -0,0 +1,122 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.server; + +import com.starrocks.common.Config; +import com.starrocks.common.DdlException; +import com.starrocks.common.RestoreSnapshotConfig; +import com.starrocks.ha.FrontendNodeType; +import com.starrocks.system.Backend; +import com.starrocks.system.ComputeNode; +import com.starrocks.system.Frontend; +import com.starrocks.system.SystemInfoService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class RestoreSnapshotMgr { + private static final Logger LOG = LogManager.getLogger(RestoreSnapshotMgr.class); + + private static RestoreSnapshotMgr instance; + + private RestoreSnapshotConfig config; + private boolean oldStartWithIncompleteMeta; + private boolean oldResetElectionGroup; + + private RestoreSnapshotMgr(String restoreSnapshotYamlFile) { + config = RestoreSnapshotConfig.load(restoreSnapshotYamlFile); + // Save the old config + oldStartWithIncompleteMeta = Config.start_with_incomplete_meta; + // Allow starting with only image and no log + Config.start_with_incomplete_meta = true; + // Save the old config + oldResetElectionGroup = Config.bdbje_reset_election_group; + Config.bdbje_reset_election_group = config.isResetElectionGroup(); + } + + public static void init(String restoreSnapshotYamlFile, String[] args) { + for (String arg : args) { + if (arg.equalsIgnoreCase("-restore_snapshot")) { + LOG.info("FE start to restore from a snapshot"); + instance = new RestoreSnapshotMgr(restoreSnapshotYamlFile); + return; + } + } + } + + public static boolean isRestoring() { + return instance != null; + } + + public static void finishRestoring() throws DdlException { + RestoreSnapshotMgr self = instance; + if (self == null) { + return; + } + + List frontends = self.config.getFrontends(); + if (frontends != null) { + NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); + // Drop old frontends + for (Frontend frontend : nodeMgr.getFrontends().values()) { + if (frontend.getRole() == FrontendNodeType.LEADER) { + continue; + } + LOG.info("Drop old frontend {}", frontend); + nodeMgr.dropFrontend(frontend.getRole(), frontend.getHost(), frontend.getEditLogPort()); + } + + // Add new frontends + for (RestoreSnapshotConfig.Frontend frontend : frontends) { + LOG.info("Add new frontend {}", frontend); + nodeMgr.addFrontend(frontend.isFollower() ? FrontendNodeType.FOLLOWER : FrontendNodeType.OBSERVER, + frontend.getHost(), frontend.getEditLogPort()); + } + } + + List computeNodes = self.config.getComputeNodes(); + if (computeNodes != null) { + SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); + for (Backend be : systemInfoService.getIdToBackend().values()) { + LOG.info("Drop old backend {}", be); + systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); + } + + // Drop old compute nodes + for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) { + LOG.info("Drop old compute node {}", cn); + systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + + // Add new compute nodes + for (RestoreSnapshotConfig.ComputeNode cn : computeNodes) { + LOG.info("Add new compute node {}", cn); + systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); + } + } + + // TODO: Update storage volume + + // Rollback config + Config.start_with_incomplete_meta = self.oldStartWithIncompleteMeta; + Config.bdbje_reset_election_group = self.oldResetElectionGroup; + + instance = null; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index 9c8912e646bc2d..faba846206a973 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -131,11 +131,6 @@ public SystemInfoService() { public void addComputeNodes(AddComputeNodeClause addComputeNodeClause) throws DdlException { - - for (Pair pair : addComputeNodeClause.getHostPortPairs()) { - checkSameNodeExist(pair.first, pair.second); - } - for (Pair pair : addComputeNodeClause.getHostPortPairs()) { addComputeNode(pair.first, pair.second, addComputeNodeClause.getWarehouse()); } @@ -169,7 +164,9 @@ public void dropComputeNode(ComputeNode computeNode) { } // Final entry of adding compute node - private void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { + public void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { + checkSameNodeExist(host, heartbeatPort); + ComputeNode newComputeNode = new ComputeNode(GlobalStateMgr.getCurrentState().getNextId(), host, heartbeatPort); idToComputeNodeRef.put(newComputeNode.getId(), newComputeNode); setComputeNodeOwner(newComputeNode); diff --git a/fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java b/fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java new file mode 100644 index 00000000000000..a1c1ad99b8a609 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/common/RestoreSnapshotConfigTest.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.common; + +import org.junit.Assert; +import org.junit.Test; + +public class RestoreSnapshotConfigTest { + + @Test + public void testLoadFromFile() { + RestoreSnapshotConfig config = RestoreSnapshotConfig.load("src/test/resources/conf/restore_snapshot.yaml"); + Assert.assertTrue(config.isResetElectionGroup()); + Assert.assertFalse(config.getFrontends().isEmpty()); + Assert.assertFalse(config.getComputeNodes().isEmpty()); + Assert.assertFalse(config.getStorageVolumes().isEmpty()); + } +} diff --git a/fe/fe-core/src/test/resources/conf/restore_snapshot.yaml b/fe/fe-core/src/test/resources/conf/restore_snapshot.yaml new file mode 100644 index 00000000000000..19cf9a7d0ccff7 --- /dev/null +++ b/fe/fe-core/src/test/resources/conf/restore_snapshot.yaml @@ -0,0 +1,37 @@ +# set to true if frontends changed +reset_election_group: true + +frontends: + - host: host1 + edit_log_port: 9010 + type: follower #default follower + - host: host2 + edit_log_port: 9010 + type: observer + +compute_nodes: + - host: host1 + heartbeat_service_port: 9050 + - host: host2 + heartbeat_service_port: 9050 + +# used for restoring a cloned snapshot +storage_volumes: + - name: storage_volume1 + type: S3 + location: location1 + comment: comment1 + properties: + - key: key1 + value: value1 + - key: key2 + value: value2 + - name: storage_volume2 + type: HDFS + location: location2 + comment: comment2 + properties: + - key: key1 + value: value1 + - key: key2 + value: value2