Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support restoring from a cluster snapshot for shared-data mode (part 1) (backport #53861) #54074

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bin/start_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ OPTS=$(getopt \
-l 'daemon' \
-l 'helper:' \
-l 'host_type:' \
-l 'cluster_snapshot' \
-l 'debug' \
-l 'logconsole' \
-- "$@")
Expand All @@ -34,6 +35,7 @@ eval set -- "$OPTS"
RUN_DAEMON=0
HELPER=
HOST_TYPE=
CLUSTER_SNAPSHOT=
ENABLE_DEBUGGER=0
RUN_LOG_CONSOLE=${SYS_LOG_TO_CONSOLE:-0}
# min jdk version required
Expand All @@ -43,6 +45,7 @@ while true; do
--daemon) RUN_DAEMON=1 ; shift ;;
--helper) HELPER=$2 ; shift 2 ;;
--host_type) HOST_TYPE=$2 ; shift 2 ;;
--cluster_snapshot) CLUSTER_SNAPSHOT="-cluster_snapshot" ; shift ;;
--debug) ENABLE_DEBUGGER=1 ; shift ;;
--logconsole) RUN_LOG_CONSOLE=1 ; shift ;;
--) shift ; break ;;
Expand Down Expand Up @@ -228,7 +231,7 @@ echo "start time: $(date), server uptime: $(uptime)"

# StarRocksFE java process will write its process id into $pidfile
if [ ${RUN_DAEMON} -eq 1 ]; then
nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" </dev/null &
nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} ${CLUSTER_SNAPSHOT} "$@" </dev/null &
else
exec $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" </dev/null
exec $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} ${CLUSTER_SNAPSHOT} "$@" </dev/null
fi
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then
cp -r -p ${STARROCKS_HOME}/conf/udf_security.policy ${STARROCKS_OUTPUT}/fe/conf/
cp -r -p ${STARROCKS_HOME}/conf/hadoop_env.sh ${STARROCKS_OUTPUT}/fe/conf/
cp -r -p ${STARROCKS_HOME}/conf/core-site.xml ${STARROCKS_OUTPUT}/fe/conf/
cp -r -p ${STARROCKS_HOME}/conf/cluster_snapshot.yaml ${STARROCKS_OUTPUT}/fe/conf/

rm -rf ${STARROCKS_OUTPUT}/fe/lib/*
cp -r -p ${STARROCKS_HOME}/fe/fe-core/target/lib/* ${STARROCKS_OUTPUT}/fe/lib/
Expand Down
39 changes: 39 additions & 0 deletions conf/cluster_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# do not include leader fe
#frontends:
# - host: 172.26.92.1
# edit_log_port: 9010
# type: follower #default follower
# - host: 172.26.92.2
# edit_log_port: 9010
# type: observer

#compute_nodes:
# - host: 172.26.92.11
# heartbeat_service_port: 9050
# - host: 172.26.92.12
# heartbeat_service_port: 9050

# used for restoring a cloned snapshot
#storage_volumes:
# - name: my_s3_volume
# type: S3
# location: s3://defaultbucket/test/
# comment: my s3 volume
# properties:
# - key: aws.s3.region
# value: us-west-2
# - key: aws.s3.endpoint
# value: https://s3.us-west-2.amazonaws.com
# - key: aws.s3.access_key
# value: xxxxxxxxxx
# - key: aws.s3.secret_key
# value: yyyyyyyyyy
# - name: my_hdfs_volume
# type: HDFS
# location: hdfs://127.0.0.1:9000/sr/test/
# comment: my hdfs volume
# properties:
# - key: hadoop.security.authentication
# value: simple
# - key: username
# value: starrocks
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.starrocks.journal.bdbje.BDBJEJournal;
import com.starrocks.journal.bdbje.BDBTool;
import com.starrocks.journal.bdbje.BDBToolOptions;
import com.starrocks.lake.snapshot.RestoreClusterSnapshotMgr;
import com.starrocks.leader.MetaHelper;
import com.starrocks.qe.CoordinatorMonitor;
import com.starrocks.qe.QeService;
Expand Down Expand Up @@ -117,6 +118,8 @@ public static void start(String starRocksDir, String pidDir, String[] args) {
// set dns cache ttl
java.security.Security.setProperty("networkaddress.cache.ttl", "60");

RestoreClusterSnapshotMgr.init(starRocksDir + "/conf/cluster_snapshot.yaml", args);

// check meta dir
MetaHelper.checkMetaDir();

Expand Down Expand Up @@ -180,6 +183,8 @@ public static void start(String starRocksDir, String pidDir, String[] args) {

LOG.info("FE started successfully");

RestoreClusterSnapshotMgr.finishRestoring();

while (!stopped) {
Thread.sleep(2000);
}
Expand Down Expand Up @@ -220,6 +225,7 @@ private static CommandLineOptions parseArgs(String[] args) {
CommandLineParser commandLineParser = new BasicParser();
Options options = new Options();
options.addOption("ht", "host_type", false, "Specify fe start use ip or fqdn");
options.addOption("rs", "cluster_snapshot", false, "Specify fe start to restore from a cluster snapshot");
options.addOption("v", "version", false, "Print the version of StarRocks Frontend");
options.addOption("h", "helper", true, "Specify the helper node when joining a bdb je replication group");
options.addOption("b", "bdb", false, "Run bdbje debug tools");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.starrocks.lake.snapshot;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ClusterSnapshotConfig {
private static final Logger LOG = LogManager.getLogger(ClusterSnapshotConfig.class);

public static class Frontend {
public static enum FrontendType {
FOLLOWER,
OBSERVER;

@JsonCreator
public static FrontendType forValue(String value) {
return FrontendType.valueOf(value.toUpperCase());
}

@JsonValue
public String toValue() {
return name().toLowerCase();
}
}

@JsonProperty("host")
private String host;

@JsonProperty("edit_log_port")
private int editLogPort;

@JsonProperty("type")
private FrontendType type = FrontendType.FOLLOWER;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getEditLogPort() {
return editLogPort;
}

public void setEditLogPort(int editLogPort) {
this.editLogPort = editLogPort;
}

public FrontendType getType() {
return type;
}

public void setType(FrontendType type) {
this.type = type;
}

public boolean isFollower() {
return this.type == FrontendType.FOLLOWER;
}

public boolean isObserver() {
return this.type == FrontendType.OBSERVER;
}

@Override
public String toString() {
return "Frontend [host=" + host + ", editLogPort=" + editLogPort + ", type=" + type + "]";
}
}

public static class ComputeNode {
@JsonProperty("host")
private String host;

@JsonProperty("heartbeat_service_port")
private int heartbeatServicePort;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getHeartbeatServicePort() {
return heartbeatServicePort;
}

public void setHeartbeatServicePort(int heartbeatServicePort) {
this.heartbeatServicePort = heartbeatServicePort;
}

@Override
public String toString() {
return "ComputeNode [host=" + host + ", heartbeatServicePort=" + heartbeatServicePort + "]";
}
}

public static class StorageVolume {
public static enum StorageVolumeType {
S3,
HDFS,
AZBLOB;

@JsonCreator
public static StorageVolumeType forValue(String value) {
return StorageVolumeType.valueOf(value.toUpperCase());
}

@JsonValue
public String toValue() {
return name().toLowerCase();
}
}

private static class PropertiesDeserializer extends JsonDeserializer<Map<String, String>> {

@Override
public Map<String, String> deserialize(JsonParser parser, DeserializationContext context)
throws IOException, JsonProcessingException {
ObjectMapper mapper = (ObjectMapper) parser.getCodec();
List<Map<String, String>> list = mapper.readValue(parser,
new TypeReference<List<Map<String, String>>>() {
});

Map<String, String> properties = new HashMap<>();
for (Map<String, String> entry : list) {
String key = entry.get("key");
String value = entry.get("value");
if (key == null || key.isEmpty() || value == null || value.isEmpty()) {
throw new JsonProcessingException("Missing 'key' or 'value' in properties entry",
parser.getTokenLocation()) {
};
}
properties.put(key, value);
}
return properties;
}
}

@JsonProperty("name")
private String name;

@JsonProperty("type")
private StorageVolumeType type;

@JsonProperty("location")
private String location;

@JsonProperty("comment")
private String comment;

@JsonProperty("properties")
@JsonDeserialize(using = PropertiesDeserializer.class)
private Map<String, String> properties;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public StorageVolumeType getType() {
return type;
}

public void setType(StorageVolumeType type) {
this.type = type;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

public String getComment() {
return comment;
}

public void setComment(String comment) {
this.comment = comment;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
}

@JsonProperty("frontends")
private List<Frontend> frontends;

@JsonProperty("compute_nodes")
private List<ComputeNode> computeNodes;

@JsonProperty("storage_volumes")
private List<StorageVolume> storageVolumes;

public List<Frontend> getFrontends() {
return frontends;
}

public List<ComputeNode> getComputeNodes() {
return computeNodes;
}

public List<StorageVolume> getStorageVolumes() {
return storageVolumes;
}

public static ClusterSnapshotConfig load(String clusterSnapshotYamlFile) {
try {
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
ClusterSnapshotConfig config = objectMapper.readValue(new File(clusterSnapshotYamlFile),
ClusterSnapshotConfig.class);
return config;
} catch (Exception e) {
LOG.warn("Failed to load cluster snapshot config {} ", clusterSnapshotYamlFile, e);
throw new RuntimeException(e);
}
}
}
Loading
Loading