From 8f9539e0cdf4dec51d06163bab7f765f7a28cda3 Mon Sep 17 00:00:00 2001 From: zfrendo <842557664@qq.com> Date: Mon, 9 Sep 2024 11:41:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20tendbCluster=E6=94=AF=E6=8C=81=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E5=88=86=E7=89=87=E6=9F=A5=E8=AF=A2=E5=A4=87=E4=BB=BD?= =?UTF-8?q?=E7=94=A8=E4=BA=8E=E6=81=A2=E5=A4=8D=20#6747?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mysql/fixpoint_rollback/handlers.py | 30 ++++++++++++++----- .../common/mysql_resotre_data_sub_flow.py | 2 +- .../mysql/common/recover_slave_instance.py | 5 +++- .../spider/remote_master_slave_migrate.py | 18 ++++++----- .../spider/switch_remote_spt_routing.py | 2 +- .../tendbcluster/tendb_restore_slave.py | 2 ++ 6 files changed, 40 insertions(+), 19 deletions(-) diff --git a/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py b/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py index 194647993f..23f1f8e7fc 100644 --- a/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py +++ b/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py @@ -10,6 +10,7 @@ """ import copy import json +import logging import re from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed @@ -36,6 +37,8 @@ from backend.utils.string import base64_encode from backend.utils.time import compare_time, datetime2str, find_nearby_time +logger = logging.getLogger("flow") + class FixPointRollbackHandler: """ @@ -192,10 +195,11 @@ def aggregate_tendb_dbbackup_logs(self, backup_logs: List[Dict[str, Any]]) -> Li return valid_backup_logs - def aggregate_tendbcluster_dbbackup_logs(self, backup_logs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def aggregate_tendbcluster_dbbackup_logs(self, backup_logs: List[Dict], shard_list: List = None) -> List[Dict]: """ 聚合tendbcluster的mysql_backup_result日志,按照backup_id聚合tendb备份记录 :param backup_logs: 备份记录列表 + :param shard_list: 备份分片数 """ def insert_time_field(_back_log, _log): @@ -298,17 +302,25 @@ def insert_log_into_node(_backup_node, _log): if backup_node: insert_time_field(backup_id__backup_logs_map[backup_id], backup_node) + logger.info("backup info:", backup_id__backup_logs_map) # 获取合法的备份记录 - cluster_shard_num = self.cluster.tendbclusterstorageset_set.count() + if shard_list is not None and len(shard_list) > 0: + shard_list = sorted(shard_list) + else: + cluster_shard_num = self.cluster.tendbclusterstorageset_set.count() + shard_list = list(range(0, cluster_shard_num)) + backup_id__valid_backup_logs = defaultdict(dict) for backup_id, backup_log in backup_id__backup_logs_map.items(): # 获取合法分片ID,如果分片数不完整,则忽略 shard_value_list = [ - shard_value + int(shard_value) for shard_value in backup_log["remote_node"].keys() if backup_log["remote_node"][shard_value] ] - if sorted(shard_value_list) != list(range(0, cluster_shard_num)): + logger.info("backup shard list:", shard_value_list) + logger.info("user shard list:", shard_list) + if not set(shard_value_list).issuperset(set(shard_list)): continue # 如果不存在spider master记录,则忽略 @@ -323,11 +335,12 @@ def insert_log_into_node(_backup_node, _log): return list(backup_id__valid_backup_logs.values()) - def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime) -> List[Dict]: + def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime, **kwargs) -> List[Dict]: """ 通过日志平台查询集群的时间范围内的备份记录 :param start_time: 开始时间 :param end_time: 结束时间 + :param shard_list: tendbcluster专属,备份分片数 """ backup_logs = self._get_log_from_bklog( @@ -338,7 +351,8 @@ def query_backup_log_from_bklog(self, start_time: datetime, end_time: datetime) ) if self.cluster.cluster_type == ClusterType.TenDBCluster: - return self.aggregate_tendbcluster_dbbackup_logs(backup_logs) + shard_list = kwargs.get("shard_list", []) + return self.aggregate_tendbcluster_dbbackup_logs(backup_logs, shard_list) else: return self.aggregate_tendb_dbbackup_logs(backup_logs) @@ -483,7 +497,7 @@ def query_backup_log_from_local(self) -> List[Dict[str, Any]]: return local_backup_logs def query_latest_backup_log( - self, rollback_time: datetime, backup_source: str = MySQLBackupSource.REMOTE.value + self, rollback_time: datetime, backup_source: str = MySQLBackupSource.REMOTE.value, **kwargs ) -> Dict[str, Any]: """ 根据回档时间查询最新一次的备份记录 @@ -495,7 +509,7 @@ def query_latest_backup_log( # 日志平台查询 end_time = rollback_time start_time = end_time - timedelta(days=BACKUP_LOG_ROLLBACK_TIME_RANGE_DAYS) - backup_logs = self.query_backup_log_from_bklog(start_time, end_time) + backup_logs = self.query_backup_log_from_bklog(start_time, end_time, **kwargs) if not backup_logs: return None diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py index 030bfd67a7..81a697284f 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py @@ -60,7 +60,7 @@ def mysql_restore_data_sub_flow( backup_info = get_local_backup(ins_list, cluster_model) if backup_info is None: logger.error("cluster {} backup info not exists".format(cluster_model.id)) - raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_model.id))) + raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的本地备份信息失败".format(cluster_model.id))) cluster["backupinfo"] = backup_info exec_act_kwargs = ExecActuatorKwargs( bk_cloud_id=cluster_model.bk_cloud_id, diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py index 2c4091bf2c..71e321a80a 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py @@ -63,7 +63,10 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict): # 查询备份 rollback_time = datetime.now(timezone.utc) rollback_handler = FixPointRollbackHandler(cluster_id=cluster["cluster_id"]) - backup_info = rollback_handler.query_latest_backup_log(rollback_time) + shard_list = [] + if cluster["cluster_type"] == ClusterType.TenDBCluster: + shard_list = [int(cluster_info["shard_id"])] + backup_info = rollback_handler.query_latest_backup_log(rollback_time, shard_list=shard_list) if backup_info is None: logger.error("cluster {} backup info not exists".format(cluster["cluster_id"])) raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster["cluster_id"]))) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py index b6ae04349f..724a48e12f 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_master_slave_migrate.py @@ -117,25 +117,27 @@ def migrate_master_slave_flow(self): db_module_id=cluster_class.db_module_id, cluster_type=cluster_class.cluster_type, ) + + tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + cluster_info = get_master_slave_recover_info( + cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"] + ) + cluster_info["charset"] = self.data["charset"] + cluster_info["db_version"] = self.data["db_version"] + cluster_info["ports"] = [] backup_info = {} if self.ticket_data["backup_source"] == MySQLBackupSource.REMOTE.value: # 先查询备份,如果备份不存在则退出 # restore_time = datetime.strptime("2023-07-31 17:40:00", "%Y-%m-%d %H:%M:%S") backup_handler = FixPointRollbackHandler(cluster_class.id) restore_time = datetime.now(timezone.utc) - backup_info = backup_handler.query_latest_backup_log(restore_time) + shard_list = [int(shard_id) for shard_id in cluster_info["my_shards"].keys()] + backup_info = backup_handler.query_latest_backup_log(restore_time, shard_list=shard_list) logger.debug(backup_info) if backup_info is None: logger.error("cluster {} backup info not exists".format(cluster_class.id)) raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id))) - tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) - cluster_info = get_master_slave_recover_info( - cluster_class.id, self.data["old_master_ip"], self.data["old_slave_ip"] - ) - cluster_info["charset"] = self.data["charset"] - cluster_info["db_version"] = self.data["db_version"] - cluster_info["ports"] = [] for shard_id, shard in cluster_info["my_shards"].items(): master = { "ip": self.data["new_master_ip"], diff --git a/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py b/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py index 14acda2a51..0c63074d6d 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py +++ b/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py @@ -95,7 +95,6 @@ def _alter_remote_slave_routing( # 获取待切换的分片信息,拼接alter node语句 server_name = res[0]["cmd_results"][1]["table_data"][0]["Server_name"] - # 执行替换节点路由信息 exec_sql = [ "set tc_admin=1", @@ -103,6 +102,7 @@ def _alter_remote_slave_routing( f"{server_name} OPTIONS(host '{new_ip}', port {new_port}, password '{tdbctl_pass}', user '{TDBCTL_USER}')", "TDBCTL FLUSH ROUTING", ] + self.log_info(str(exec_sql)) rpc_params["cmds"] = exec_sql res = DRSApi.rpc(rpc_params) if res[0]["error_msg"]: diff --git a/dbm-ui/backend/ticket/builders/tendbcluster/tendb_restore_slave.py b/dbm-ui/backend/ticket/builders/tendbcluster/tendb_restore_slave.py index e2d75a3147..556f6b68ff 100644 --- a/dbm-ui/backend/ticket/builders/tendbcluster/tendb_restore_slave.py +++ b/dbm-ui/backend/ticket/builders/tendbcluster/tendb_restore_slave.py @@ -53,6 +53,8 @@ class TendbClusterRestoreSlaveParamBuilder(builders.FlowParamBuilder): def format_ticket_data(self): if self.ticket_data["ip_source"] == IpSource.RESOURCE_POOL: + for info in self.ticket_data["infos"]: + info["resource_spec"]["remote"] = info["resource_spec"]["new_slave"] return for info in self.ticket_data["infos"]: