Skip to content

Commit

Permalink
feat: tendbCluster支持指定分片查询备份用于恢复 #6747
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo authored and iSecloud committed Sep 10, 2024
1 parent b3aaf33 commit ee00955
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 19 deletions.
30 changes: 22 additions & 8 deletions dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import copy
import json
import logging
import re
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -36,6 +37,8 @@
from backend.utils.string import base64_encode
from backend.utils.time import compare_time, datetime2str, find_nearby_time

logger = logging.getLogger("flow")


class FixPointRollbackHandler:
"""
Expand Down Expand Up @@ -192,10 +195,11 @@ def aggregate_tendb_dbbackup_logs(self, backup_logs: List[Dict[str, Any]]) -> Li

return valid_backup_logs

def aggregate_tendbcluster_dbbackup_logs(self, backup_logs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def aggregate_tendbcluster_dbbackup_logs(self, backup_logs: List[Dict], shard_list: List = None) -> List[Dict]:
"""
聚合tendbcluster的mysql_backup_result日志,按照backup_id聚合tendb备份记录
:param backup_logs: 备份记录列表
:param shard_list: 备份分片数
"""

def insert_time_field(_back_log, _log):
Expand Down Expand Up @@ -298,17 +302,25 @@ def insert_log_into_node(_backup_node, _log):
if backup_node:
insert_time_field(backup_id__backup_logs_map[backup_id], backup_node)

logger.info("backup info:", backup_id__backup_logs_map)
# 获取合法的备份记录
cluster_shard_num = self.cluster.tendbclusterstorageset_set.count()
if shard_list is not None and len(shard_list) > 0:
shard_list = sorted(shard_list)
else:
cluster_shard_num = self.cluster.tendbclusterstorageset_set.count()
shard_list = list(range(0, cluster_shard_num))

backup_id__valid_backup_logs = defaultdict(dict)
for backup_id, backup_log in backup_id__backup_logs_map.items():
# 获取合法分片ID,如果分片数不完整,则忽略
shard_value_list = [
shard_value
int(shard_value)
for shard_value in backup_log["remote_node"].keys()
if backup_log["remote_node"][shard_value]
]
if sorted(shard_value_list) != list(range(0, cluster_shard_num)):
logger.info("backup shard list:", shard_value_list)
logger.info("user shard list:", shard_list)
if not set(shard_value_list).issuperset(set(shard_list)):
continue

# 如果不存在spider master记录,则忽略
Expand All @@ -323,11 +335,12 @@ def insert_log_into_node(_backup_node, _log):

return list(backup_id__valid_backup_logs.values())

def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime) -> List[Dict]:
def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime, **kwargs) -> List[Dict]:
"""
通过日志平台查询集群的时间范围内的备份记录
:param start_time: 开始时间
:param end_time: 结束时间
:param shard_list: tendbcluster专属,备份分片数
"""

backup_logs = self._get_log_from_bklog(
Expand All @@ -338,7 +351,8 @@ def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime)
)

if self.cluster.cluster_type == ClusterType.TenDBCluster:
return self.aggregate_tendbcluster_dbbackup_logs(backup_logs)
shard_list = kwargs.get("shard_list", [])
return self.aggregate_tendbcluster_dbbackup_logs(backup_logs, shard_list)
else:
return self.aggregate_tendb_dbbackup_logs(backup_logs)

Expand Down Expand Up @@ -483,7 +497,7 @@ def query_backup_log_from_local(self) -> List[Dict[str, Any]]:
return local_backup_logs

def query_latest_backup_log(
self, rollback_time: datetime, backup_source: str = MySQLBackupSource.REMOTE.value
self, rollback_time: datetime, backup_source: str = MySQLBackupSource.REMOTE.value, **kwargs
) -> Dict[str, Any]:
"""
根据回档时间查询最新一次的备份记录
Expand All @@ -495,7 +509,7 @@ def query_latest_backup_log(
# 日志平台查询
end_time = rollback_time
start_time = end_time - timedelta(days=BACKUP_LOG_ROLLBACK_TIME_RANGE_DAYS)
backup_logs = self.query_backup_log_from_bklog(start_time, end_time)
backup_logs = self.query_backup_log_from_bklog(start_time, end_time, **kwargs)

if not backup_logs:
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def mysql_restore_data_sub_flow(
backup_info = get_local_backup(ins_list, cluster_model)
if backup_info is None:
logger.error("cluster {} backup info not exists".format(cluster_model.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_model.id)))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的本地备份信息失败".format(cluster_model.id)))
cluster["backupinfo"] = backup_info
exec_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
# 查询备份
rollback_time = datetime.now(timezone.utc)
rollback_handler = FixPointRollbackHandler(cluster_id=cluster["cluster_id"])
backup_info = rollback_handler.query_latest_backup_log(rollback_time)
shard_list = []
if cluster["cluster_type"] == ClusterType.TenDBCluster:
shard_list = [int(cluster_info["shard_id"])]
backup_info = rollback_handler.query_latest_backup_log(rollback_time, shard_list=shard_list)
if backup_info is None:
logger.error("cluster {} backup info not exists".format(cluster["cluster_id"]))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster["cluster_id"])))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,27 @@ def migrate_master_slave_flow(self):
db_module_id=cluster_class.db_module_id,
cluster_type=cluster_class.cluster_type,
)

tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
cluster_info = get_master_slave_recover_info(
cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"]
)
cluster_info["charset"] = self.data["charset"]
cluster_info["db_version"] = self.data["db_version"]
cluster_info["ports"] = []
backup_info = {}
if self.ticket_data["backup_source"] == MySQLBackupSource.REMOTE.value:
# 先查询备份,如果备份不存在则退出
# restore_time = datetime.strptime("2023-07-31 17:40:00", "%Y-%m-%d %H:%M:%S")
backup_handler = FixPointRollbackHandler(cluster_class.id)
restore_time = datetime.now(timezone.utc)
backup_info = backup_handler.query_latest_backup_log(restore_time)
shard_list = [int(shard_id) for shard_id in cluster_info["my_shards"].keys()]
backup_info = backup_handler.query_latest_backup_log(restore_time, shard_list=shard_list)
logger.debug(backup_info)
if backup_info is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
cluster_info = get_master_slave_recover_info(
cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"]
)

cluster_info["charset"] = self.data["charset"]
cluster_info["db_version"] = self.data["db_version"]
cluster_info["ports"] = []
for shard_id, shard in cluster_info["my_shards"].items():
master = {
"ip": self.data["new_master_ip"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ def _alter_remote_slave_routing(

# 获取待切换的分片信息,拼接alter node语句
server_name = res[0]["cmd_results"][1]["table_data"][0]["Server_name"]

# 执行替换节点路由信息
exec_sql = [
"set tc_admin=1",
f"TDBCTL ALTER NODE "
f"{server_name} OPTIONS(host '{new_ip}', port {new_port}, password '{tdbctl_pass}', user '{TDBCTL_USER}')",
"TDBCTL FLUSH ROUTING",
]
self.log_info(str(exec_sql))
rpc_params["cmds"] = exec_sql
res = DRSApi.rpc(rpc_params)
if res[0]["error_msg"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class TendbClusterRestoreSlaveParamBuilder(builders.FlowParamBuilder):

def format_ticket_data(self):
if self.ticket_data["ip_source"] == IpSource.RESOURCE_POOL:
for info in self.ticket_data["infos"]:
info["resource_spec"]["remote"] = info["resource_spec"]["new_slave"]
return

for info in self.ticket_data["infos"]:
Expand Down

0 comments on commit ee00955

Please sign in to comment.