From b32e0beb42a9404f872a92c4a1b66f5be6ac4b5d Mon Sep 17 00:00:00 2001 From: zfrendo <842557664@qq.com> Date: Thu, 5 Sep 2024 15:15:44 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20tendbCluster=E5=8E=9F=E5=9C=B0=E9=87=8D?= =?UTF-8?q?=E5=BB=BA=E5=A2=9E=E5=8A=A0=E8=B7=AF=E7=94=B1=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E5=AE=9A=E7=82=B9=E5=9B=9E=E6=A1=A3=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E4=BB=8E=E5=8E=9F=E4=B8=BB=E4=BB=8E=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E8=8E=B7=E5=8F=96binlog=20#6666?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scene/mysql/common/get_binlog_backup.py | 62 ++++++++ .../common/mysql_resotre_data_sub_flow.py | 29 ++-- .../mysql/common/recover_slave_instance.py | 12 -- .../mysql/common/slave_recover_switch.py | 46 ++++-- .../scene/mysql/mysql_rollback_data_flow.py | 7 +- .../mysql/mysql_rollback_data_sub_flow.py | 133 ++--------------- .../spider/remote_local_slave_recover.py | 27 +++- .../scene/spider/remote_slave_recover.py | 7 +- .../bamboo/scene/spider/spider_recover.py | 50 +++---- .../spider/switch_remote_slave_routing.py | 4 +- .../spider/switch_remote_spt_routing.py | 138 ++++++++++++++++++ .../flow/utils/mysql/mysql_act_playload.py | 11 +- .../flow/utils/spider/spider_act_dataclass.py | 22 +++ 13 files changed, 341 insertions(+), 207 deletions(-) create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/get_binlog_backup.py create mode 100644 dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/get_binlog_backup.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/get_binlog_backup.py new file mode 100644 index 0000000000..e93867c99c --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/get_binlog_backup.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from datetime import datetime + +from django.utils.translation import ugettext as _ + +from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler + +logger = logging.getLogger("root") + + +def get_backup_binlog( + cluster_id: int, start_time: datetime, end_time: datetime, binlog_info: dict, minute_range=30 +) -> dict: + result = {} + if start_time > end_time: + result["query_binlog_error"] = _("备份时间点:{} 大于 回滚时间点:{}".format(start_time, end_time)) + return result + # 先从别分文件的主节点查询,查询不到改为从节点查询。 + rollback_handler = FixPointRollbackHandler(cluster_id) + backup_binlog = rollback_handler.query_binlog_from_bklog( + start_time=start_time, + end_time=end_time, + host_ip=binlog_info["show_master_status"]["master_host"], + port=binlog_info["show_master_status"]["master_port"], + minute_range=minute_range, + ) + result["binlog_start_file"] = binlog_info["show_master_status"]["binlog_file"] + result["binlog_start_pos"] = binlog_info["show_master_status"]["binlog_pos"] + if backup_binlog is None: + if "show_slave_status" not in binlog_info.keys(): + result["query_binlog_error"] = _( + "获取原主节点 {} binlog失败".format(binlog_info["show_master_status"]["master_host"]) + ) + return result + backup_binlog = rollback_handler.query_binlog_from_bklog( + start_time=start_time, + end_time=end_time, + host_ip=binlog_info["show_slave_status"]["master_host"], + port=binlog_info["show_slave_status"]["master_port"], + minute_range=minute_range, + ) + result["binlog_start_file"] = binlog_info["show_slave_status"]["binlog_file"] + result["binlog_start_pos"] = binlog_info["show_slave_status"]["binlog_pos"] + if backup_binlog is None: + result["query_binlog_error"] = _("获取原主节点{} 和 原从节点{} 的binlog失败").format( + binlog_info["show_master_status"]["master_host"], binlog_info["show_slave_status"]["master_host"] + ) + return result + result["binlog_task_ids"] = [i["task_id"] for i in backup_binlog["file_list_details"]] + binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] + result["binlog_files"] = ",".join(binlog_files) + return result diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py index 5ad03c9650..030bfd67a7 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/mysql_resotre_data_sub_flow.py @@ -16,13 +16,15 @@ from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME, MYSQL_USUAL_JOB_TIME, DBType from backend.db_meta.models import Cluster -from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler from backend.flow.consts import MysqlChangeMasterType from backend.flow.engine.bamboo.scene.common.builder import SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList -from backend.flow.engine.bamboo.scene.mysql.common.exceptions import TenDBGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.mysql.common.get_binlog_backup import get_backup_binlog from backend.flow.engine.bamboo.scene.mysql.common.get_local_backup import get_local_backup -from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.spider.common.exceptions import ( + TendbGetBackupInfoFailedException, + TendbGetBinlogFailedException, +) from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_download_backupfile import ( MySQLDownloadBackupfileComponent, @@ -219,26 +221,19 @@ def mysql_rollback_data_sub_flow( if is_rollback_binlog: backup_time = str2datetime(backup_info["backup_time"], "%Y-%m-%d %H:%M:%S") - rollback_handler = FixPointRollbackHandler(cluster_model.id) - # 从父节点来 - backup_binlog = rollback_handler.query_binlog_from_bklog( + binlog_result = get_backup_binlog( + cluster_id=cluster_model.id, start_time=backup_time, end_time=rollback_time, - minute_range=30, - host_ip=cluster["master_ip"], - port=cluster["master_port"], + binlog_info=backup_info["binlog_info"], ) + if "query_binlog_error" in binlog_result.keys(): + raise TendbGetBinlogFailedException(message=binlog_result["query_binlog_error"]) + cluster.update(binlog_result) - if backup_binlog is None: - raise TenDBGetBackupInfoFailedException(message=_("获取实例 {} 的备份信息失败".format(cluster["master_ip"]))) - # task_ids = [i["task_id"] for i in backup_info["file_list"]] - task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]] - binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] - cluster["backup_time"] = backup_info["backup_time"] - cluster["binlog_files"] = ",".join(binlog_files) download_kwargs = DownloadBackupFileKwargs( bk_cloud_id=cluster_model.bk_cloud_id, - task_ids=task_ids, + task_ids=binlog_result["binlog_task_ids"], dest_ip=cluster["rollback_ip"], dest_dir=cluster["file_target_path"], reason="rollback node rollback binlog", diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py index e83273602c..2c4091bf2c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/recover_slave_instance.py @@ -113,18 +113,6 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict): reason="slave recover", ) - sub_pipeline.add_act( - act_name=_("下发db-actor到节点{}".format(cluster["master_ip"])), - act_component_code=TransFileComponent.code, - kwargs=asdict( - DownloadMediaKwargs( - bk_cloud_id=cluster["bk_cloud_id"], - exec_ip=[cluster["master_ip"], cluster["new_slave_ip"]], - file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), - ) - ), - ) - sub_pipeline.add_act( act_name=_("下载全库备份介质到 {}".format(cluster["new_slave_ip"])), act_component_code=MySQLDownloadBackupfileComponent.code, diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py index ce803de345..3c082d7920 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py @@ -18,13 +18,14 @@ from backend.flow.engine.bamboo.scene.common.builder import SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_tendb_ha_entry -from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import check_sub_flow from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent +from backend.flow.plugins.components.collections.mysql.mysql_rds_execute import MySQLExecuteRdsComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.mysql_act_dataclass import ( CreateDnsKwargs, DownloadMediaKwargs, + ExecuteRdsKwargs, InstanceUserCloneKwargs, RecycleDnsRecordKwargs, ) @@ -62,19 +63,36 @@ def slave_migrate_switch_sub_flow( ) # 切换前做预检测 - verify_checksum_tuples = [{"master": old_master, "slave": new_slave}] - # for m in migrate_tuples: - # old_master-> new_master ; new_master -> new_slave 都需要检测checksum结果 - sub_pipeline.add_sub_pipeline( - sub_flow=check_sub_flow( - uid=ticket_data["uid"], - root_id=root_id, - cluster=cluster, - is_check_client_conn=True, - is_verify_checksum=True, - check_client_conn_inst=["{}:{}".format(new_slave_ip, master.port)], - verify_checksum_tuples=verify_checksum_tuples, - ) + # verify_checksum_tuples = [{"master": old_master, "slave": new_slave}] + # # for m in migrate_tuples: + # # old_master-> new_master ; new_master -> new_slave 都需要检测checksum结果 + # sub_pipeline.add_sub_pipeline( + # sub_flow=check_sub_flow( + # uid=ticket_data["uid"], + # root_id=root_id, + # cluster=cluster, + # is_check_client_conn=True, + # is_verify_checksum=True, + # check_client_conn_inst=["{}:{}".format(new_slave_ip, master.port)], + # verify_checksum_tuples=verify_checksum_tuples, + # ) + # ) + # 不做检查,而是在新从库通过rds加入一条恒为正确的记录。 + fake_checksum_sql = """replace into infodba_schema.checksum values + ('{}',{},'_fake_db_','_fake_tbl_',0,0,'PRIMARY',0,0,0,0,0,0,now())""".format( + master.machine.ip, master.port + ) + sub_pipeline.add_act( + act_name=_("新从库加入checksum记录 {}").format(new_slave), + act_component_code=MySQLExecuteRdsComponent.code, + kwargs=asdict( + ExecuteRdsKwargs( + bk_cloud_id=cluster.bk_cloud_id, + instance_ip=new_slave_ip, + instance_port=master.port, + sqls=[fake_checksum_sql], + ) + ), ) clone_data = [ diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py index cacad346e3..0a10b6973d 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py @@ -14,6 +14,7 @@ from datetime import datetime from typing import Dict, Optional +from django.db.models import Q from django.utils.crypto import get_random_string from django.utils.translation import ugettext as _ @@ -74,7 +75,11 @@ def rollback_data_flow(self): for info in self.ticket_data["infos"]: self.data = copy.deepcopy(info) cluster_class = Cluster.objects.get(id=self.data["cluster_id"]) - master = cluster_class.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + filters = Q(cluster_type=ClusterType.TenDBSingle.value, instance_inner_role=InstanceInnerRole.ORPHAN.value) + filters = filters | Q( + cluster_type=ClusterType.TenDBHA.value, instance_inner_role=InstanceInnerRole.MASTER.value + ) + master = cluster_class.storageinstance_set.get(filters) self.data["bk_biz_id"] = cluster_class.bk_biz_id self.data["bk_cloud_id"] = cluster_class.bk_cloud_id self.data["db_module_id"] = cluster_class.db_module_id diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py index db76824515..edc2d1c75d 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py @@ -14,127 +14,26 @@ from django.utils.translation import ugettext as _ -from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME, DBType +from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler from backend.flow.engine.bamboo.scene.common.builder import SubBuilder -from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList -from backend.flow.engine.bamboo.scene.mysql.common.exceptions import TenDBGetBackupInfoFailedException -from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.mysql.common.get_binlog_backup import get_backup_binlog +from backend.flow.engine.bamboo.scene.spider.common.exceptions import ( + TendbGetBackupInfoFailedException, + TendbGetBinlogFailedException, +) from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_download_backupfile import ( MySQLDownloadBackupfileComponent, ) -from backend.flow.plugins.components.collections.mysql.mysql_rollback_data_download_binlog import ( - MySQLRollbackDownloadBinlogComponent, -) -from backend.flow.plugins.components.collections.mysql.rollback_trans_flies import RollBackTransFileComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent -from backend.flow.utils.mysql.mysql_act_dataclass import ( - DownloadBackupFileKwargs, - DownloadMediaKwargs, - ExecActuatorKwargs, - P2PFileKwargs, - RollBackTransFileKwargs, -) +from backend.flow.utils.mysql.mysql_act_dataclass import DownloadBackupFileKwargs, ExecActuatorKwargs, P2PFileKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.utils.time import str2datetime logger = logging.getLogger("flow") -def rollback_local_and_time(root_id: str, ticket_data: dict, cluster_info: dict): - """ - mysql 定点回档类型 本地备份+指定时间 - @param root_id: flow 流程root_id - @param ticket_data: 关联单据 ticket对象 - @param cluster_info: 关联的cluster对象 - """ - cluster_info["recover_binlog"] = True - sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(ticket_data)) - sub_pipeline.add_act( - act_name=_("下发db_actuator介质"), - act_component_code=TransFileComponent.code, - kwargs=asdict( - DownloadMediaKwargs( - bk_cloud_id=cluster_info["bk_cloud_id"], - exec_ip=[cluster_info["master_ip"], cluster_info["old_slave_ip"]], - file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), - ) - ), - ) - exec_act_kwargs = ExecActuatorKwargs( - bk_cloud_id=cluster_info["bk_cloud_id"], - cluster_type=cluster_info["cluster_type"], - cluster=cluster_info, - job_timeout=MYSQL_DATA_RESTORE_TIME, - ) - exec_act_kwargs.exec_ip = cluster_info["master_ip"] - exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_find_local_backup_payload.__name__ - sub_pipeline.add_act( - act_name=_("定点恢复之获取MASTER节点备份介质{}").format(exec_act_kwargs.exec_ip), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(exec_act_kwargs), - write_payload_var="master_backup_file", - ) - - exec_act_kwargs.exec_ip = cluster_info["old_slave_ip"] - exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_find_local_backup_payload.__name__ - sub_pipeline.add_act( - act_name=_("定点恢复之获取SLAVE节点备份介质{}").format(exec_act_kwargs.exec_ip), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(exec_act_kwargs), - write_payload_var="slave_backup_file", - ) - - sub_pipeline.add_act( - act_name=_("判断备份文件来源,并传输备份文件到新定点恢复节点{}").format(cluster_info["rollback_ip"]), - act_component_code=RollBackTransFileComponent.code, - kwargs=asdict( - RollBackTransFileKwargs( - bk_cloud_id=cluster_info["bk_cloud_id"], - file_list=[], - file_target_path=cluster_info["file_target_path"], - source_ip_list=[], - exec_ip=cluster_info["rollback_ip"], - cluster=cluster_info, - ) - ), - ) - - exec_act_kwargs.exec_ip = cluster_info["rollback_ip"] - exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_rollback_data_restore_payload.__name__ - sub_pipeline.add_act( - act_name=_("定点恢复之恢复数据{}").format(exec_act_kwargs.exec_ip), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(exec_act_kwargs), - write_payload_var="change_master_info", - ) - - # backup_time 在活动节点里。到flow下载binlog - download_kwargs = DownloadBackupFileKwargs( - bk_cloud_id=cluster_info["bk_cloud_id"], - task_ids=[], - dest_ip=cluster_info["rollback_ip"], - dest_dir=cluster_info["file_target_path"], - reason="spider node rollback binlog", - cluster=cluster_info, - ) - sub_pipeline.add_act( - act_name=_("下载定点恢复的binlog到{}").format(cluster_info["rollback_ip"]), - act_component_code=MySQLRollbackDownloadBinlogComponent.code, - kwargs=asdict(download_kwargs), - ) - - exec_act_kwargs.exec_ip = cluster_info["rollback_ip"] - exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_recover_binlog_payload.__name__ - sub_pipeline.add_act( - act_name=_("定点恢复之前滚binlog{}".format(exec_act_kwargs.exec_ip)), - act_component_code=ExecuteDBActuatorScriptComponent.code, - kwargs=asdict(exec_act_kwargs), - ) - return sub_pipeline.build_sub_process(sub_name=_("{}定点回滚数据 LOCAL_AND_TIME ".format(cluster_info["rollback_ip"]))) - - def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict): """ mysql 定点回档类型 远程备份文件+指定时间 @@ -188,23 +87,19 @@ def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict ) backup_time = str2datetime(backupinfo["backup_time"]) rollback_time = str2datetime(cluster_info["rollback_time"]) - rollback_handler = FixPointRollbackHandler(cluster_info["cluster_id"]) - backup_binlog = rollback_handler.query_binlog_from_bklog( + binlog_result = get_backup_binlog( + cluster_id=cluster_info["cluster_id"], start_time=backup_time, end_time=rollback_time, - minute_range=30, - host_ip=cluster_info["master_ip"], - port=cluster_info["master_port"], + binlog_info=backupinfo["binlog_info"], ) - if backup_binlog is None: - raise TenDBGetBackupInfoFailedException(message=_("获取实例 {} 的备份信息失败".format(cluster_info["master_ip"]))) + if "query_binlog_error" in binlog_result.keys(): + raise TendbGetBinlogFailedException(message=binlog_result["query_binlog_error"]) + cluster_info.update(binlog_result) - task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]] - binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] - cluster_info["binlog_files"] = ",".join(binlog_files) download_kwargs = DownloadBackupFileKwargs( bk_cloud_id=cluster_info["bk_cloud_id"], - task_ids=task_ids, + task_ids=binlog_result["binlog_task_ids"], dest_ip=cluster_info["rollback_ip"], dest_dir=cluster_info["file_target_path"], reason="spider node rollback binlog", diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py index 9866a955c0..4d19e14991 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py @@ -13,6 +13,7 @@ from dataclasses import asdict from typing import Dict, Optional +from django.utils.crypto import get_random_string from django.utils.translation import ugettext as _ from backend.configuration.constants import DBType @@ -23,11 +24,15 @@ from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow +from backend.flow.plugins.components.collections.common.pause import PauseComponent from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_crond_control import MysqlCrondMonitorControlComponent from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent from backend.flow.plugins.components.collections.mysql.mysql_rds_execute import MySQLExecuteRdsComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent +from backend.flow.plugins.components.collections.spider.switch_remote_spt_routing import ( + SwitchRemoteShardRoutingComponent, +) from backend.flow.utils.mysql.common.mysql_cluster_info import get_version_and_charset from backend.flow.utils.mysql.mysql_act_dataclass import ( CrondMonitorKwargs, @@ -39,6 +44,7 @@ from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.flow.utils.spider.spider_act_dataclass import InstanceServerName, SwitchRemoteShardRoutingKwargs from backend.ticket.builders.common.constants import MySQLBackupSource logger = logging.getLogger("flow") @@ -140,7 +146,7 @@ def tendb_remote_slave_local_recover(self): exec_ip=target_slave.machine.ip, ) - tendb_migrate_pipeline.add_act( + sync_data_sub_pipeline.add_act( act_name=_("屏蔽监控 {}").format(target_slave.ip_port), act_component_code=MysqlCrondMonitorControlComponent.code, kwargs=asdict( @@ -152,7 +158,7 @@ def tendb_remote_slave_local_recover(self): ), ) - tendb_migrate_pipeline.add_act( + sync_data_sub_pipeline.add_act( act_name=_("从库reset slave {}").format(target_slave.ip_port), act_component_code=MySQLExecuteRdsComponent.code, kwargs=asdict( @@ -220,6 +226,23 @@ def tendb_remote_slave_local_recover(self): ) ) tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) + tdbctl_pass = get_random_string(length=10) + switch_slave_class = SwitchRemoteShardRoutingKwargs(cluster_id=cluster_class.id, switch_remote_shard=[]) + for shard_id in self.data["shard_ids"]: + shard = cluster_class.tendbclusterstorageset_set.get(shard_id=shard_id) + inst_pairs = InstanceServerName( + server_name=f"SPT_SLAVE{shard_id}", + new_ip=shard.storage_instance_tuple.receiver.machine.ip, + new_port=shard.storage_instance_tuple.receiver.port, + tdbctl_pass=tdbctl_pass, + ) + switch_slave_class.switch_remote_shard.append(inst_pairs) + tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) + tendb_migrate_pipeline.add_act( + act_name=_("切换回原slave节点"), + act_component_code=SwitchRemoteShardRoutingComponent.code, + kwargs=asdict(switch_slave_class), + ) # 安装周边 tendb_migrate_pipeline.add_sub_pipeline( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py index eaa3af5289..e1ebb72a8c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py @@ -122,12 +122,13 @@ def tendb_remote_slave_recover(self): slaves = StorageInstance.objects.filter( machine__bk_cloud_id=cluster_class.bk_cloud_id, machine__ip=self.data["source_ip"] ) - slave_tuple = StorageInstanceTuple.objects.filter(receiver=slaves[0]).first() - if slave_tuple is None: + slave_tuple = StorageInstanceTuple.objects.filter(receiver=slaves[0]) + if slave_tuple is None or len(slave_tuple) == 0: raise MasterInstanceNotExistException( cluster_type=cluster_class.cluster_type, cluster_id=cluster_class.id ) - master = StorageInstance.objects.get(slave_tuple.ejector) + master = StorageInstance.objects.get(slave_tuple[0].ejector_id) + db_config = get_instance_config(cluster_class.bk_cloud_id, master.machine.ip, cluster_info["ports"]) install_sub_pipeline_list = [] diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py index a7d8614a29..4a877ceb61 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_recover.py @@ -14,9 +14,9 @@ from backend.configuration.constants import MYSQL_DATA_RESTORE_TIME from backend.db_meta.enums import ClusterType -from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler from backend.flow.consts import RollbackType from backend.flow.engine.bamboo.scene.common.builder import SubBuilder +from backend.flow.engine.bamboo.scene.mysql.common.get_binlog_backup import get_backup_binlog from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBinlogFailedException from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_download_backupfile import ( @@ -24,7 +24,7 @@ ) from backend.flow.utils.mysql.mysql_act_dataclass import DownloadBackupFileKwargs, ExecActuatorKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload -from backend.utils.time import compare_time, str2datetime +from backend.utils.time import str2datetime def spider_recover_sub_flow(root_id: str, ticket_data: dict, cluster: dict): @@ -80,25 +80,19 @@ def spider_recover_sub_flow(root_id: str, ticket_data: dict, cluster: dict): if cluster["rollback_type"] == RollbackType.REMOTE_AND_TIME.value and False: spider_has_binlog = cluster.get("spider_has_binlog", False) if spider_has_binlog: - if compare_time(backup_info["backup_time"], cluster["rollback_time"]): - raise TendbGetBinlogFailedException(message=_("{} 备份时间点大于回滚时间点".format(cluster["master_ip"]))) - rollback_handler = FixPointRollbackHandler(cluster["cluster_id"]) - backup_binlog = rollback_handler.query_binlog_from_bklog( - str2datetime(backup_info["backup_time"]), - str2datetime(cluster["rollback_time"]), - minute_range=30, - host_ip=cluster["rollback_ip"], - port=cluster["rollback_port"], + binlog_result = get_backup_binlog( + cluster_id=cluster["cluster_id"], + start_time=str2datetime(backup_info["backup_time"]), + end_time=str2datetime(cluster["rollback_time"]), + binlog_info=backup_info["binlog_info"], ) - if backup_binlog is None: - raise TendbGetBinlogFailedException(message=_("获取实例 {} binlog失败".format(cluster["rollback_ip"]))) + if "query_binlog_error" in binlog_result.keys(): + raise TendbGetBinlogFailedException(message=binlog_result["query_binlog_error"]) + cluster.update(binlog_result) - task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]] - binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] - cluster["binlog_files"] = ",".join(binlog_files) download_kwargs = DownloadBackupFileKwargs( bk_cloud_id=cluster["bk_cloud_id"], - task_ids=task_ids, + task_ids=binlog_result["binlog_task_ids"], dest_ip=cluster["rollback_ip"], dest_dir=cluster["file_target_path"], reason="spider node rollback binlog", @@ -182,27 +176,21 @@ def remote_node_rollback(root_id: str, ticket_data: dict, cluster: dict): write_payload_var="change_master_info", ) # 指定时间点的定点回档则需要执行binlog前滚。滚动到指定的时间点。 - if cluster["rollback_type"] == RollbackType.REMOTE_AND_TIME.value: - if compare_time(backup_info["backup_time"], cluster["rollback_time"]): - raise TendbGetBinlogFailedException(message=_("{} 备份时间点大于回滚时间点".format(cluster["master_ip"]))) - rollback_handler = FixPointRollbackHandler(cluster["cluster_id"]) - backup_binlog = rollback_handler.query_binlog_from_bklog( + if cluster["rollback_type"] == RollbackType.REMOTE_AND_TIME.value: + binlog_result = get_backup_binlog( + cluster_id=cluster["cluster_id"], start_time=str2datetime(backup_info["backup_time"]), end_time=str2datetime(cluster["rollback_time"]), - minute_range=30, - host_ip=cluster["master_ip"], - port=cluster["master_port"], + binlog_info=backup_info["binlog_info"], ) - if backup_binlog is None: - raise TendbGetBinlogFailedException(message=_("获取实例 {} 的备份信息失败".format(cluster["master_ip"]))) + if "query_binlog_error" in binlog_result.keys(): + raise TendbGetBinlogFailedException(message=binlog_result["query_binlog_error"]) + cluster.update(binlog_result) - task_ids = [i["task_id"] for i in backup_binlog["file_list_details"]] - binlog_files = [i["file_name"] for i in backup_binlog["file_list_details"]] - cluster["binlog_files"] = ",".join(binlog_files) download_kwargs = DownloadBackupFileKwargs( bk_cloud_id=cluster["bk_cloud_id"], - task_ids=task_ids, + task_ids=binlog_result["binlog_task_ids"], dest_ip=cluster["rollback_ip"], dest_dir=cluster["file_target_path"], reason="tenDB rollback binlog", diff --git a/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_slave_routing.py b/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_slave_routing.py index 57e1d59e82..9ff7f02c15 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_slave_routing.py +++ b/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_slave_routing.py @@ -37,7 +37,7 @@ def _create_tdbctl_user(self, cluster: Cluster, ctl_primary: str, new_ip: str, n "dml_ddl_priv": "", "global_priv": "all privileges", "address": f"{new_ip}{IP_PORT_DIVIDER}{new_port}", - "hosts": [ctl_primary.split(":")[0]], + "hosts": [ctl_primary.split(IP_PORT_DIVIDER)[0]], } ) self.log_info(f"add tdbctl user in instance [f'{new_ip}{IP_PORT_DIVIDER}{new_port}'] success") @@ -75,7 +75,7 @@ def _alter_remote_slave_routing( return False if not res[0]["cmd_results"][1]["table_data"]: - self.log_error(f"Node [{old_ip}:{old_port}] no longer has routing information") + self.log_error(f"Node [{old_ip}{IP_PORT_DIVIDER}{old_port}] no longer has routing information") return False # 添加tdbctl账号 diff --git a/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py b/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py new file mode 100644 index 0000000000..14acda2a51 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/spider/switch_remote_spt_routing.py @@ -0,0 +1,138 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from pipeline.component_framework.component import Component + +from backend.components import DBPrivManagerApi, DRSApi +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.models import Cluster +from backend.flow.consts import TDBCTL_USER +from backend.flow.plugins.components.collections.common.base_service import BaseService + + +class SwitchRemoteShardRoutingService(BaseService): + """ + 定义spider(tenDB cluster)集群的替换remote slave实例的路由关系 + """ + + def _create_tdbctl_user(self, cluster: Cluster, ctl_primary: str, new_ip: str, new_port: int, tdbctl_pass: str): + """ + 再新的实例对中控primary授权 + """ + # 删除已经存在的spider账号 + rpc_params = { + "addresses": [f"{new_ip}{IP_PORT_DIVIDER}{new_port}"], + "cmds": [ + f"drop user '{TDBCTL_USER}'@'{ctl_primary.split(IP_PORT_DIVIDER)[0]}'", + ], + "force": False, + "bk_cloud_id": cluster.bk_cloud_id, + } + # drs服务远程请求 + res = DRSApi.rpc(rpc_params) + self.log_info(res) + # 添加临时账号 + DBPrivManagerApi.add_priv_without_account_rule( + params={ + "bk_cloud_id": cluster.bk_cloud_id, + "bk_biz_id": cluster.bk_biz_id, + "operator": "", + "user": TDBCTL_USER, + "psw": tdbctl_pass, + "dbname": "%", + "dml_ddl_priv": "", + "global_priv": "all privileges", + "address": f"{new_ip}{IP_PORT_DIVIDER}{new_port}", + "hosts": [ctl_primary.split(IP_PORT_DIVIDER)[0]], + } + ) + self.log_info(f"add tdbctl user in instance [f'{new_ip}{IP_PORT_DIVIDER}{new_port}'] success") + + def _alter_remote_slave_routing( + self, cluster: Cluster, server_name: str, new_ip: str, new_port: int, tdbctl_pass: str + ): + """ + 替换实例的路由信息的具体逻辑 + @param cluster: 关联操作的cluster对象 + @param server_name: 待更新的server_name + @param new_ip: 新实例的ip + @param new_port: 新实例的port + @param tdbctl_pass: 新实例 + """ + + # 获取最新cluster的中控 primary节点 + ctl_primary = cluster.tendbcluster_ctl_primary_address() + rpc_params = { + "addresses": [ctl_primary], + "cmds": [ + "set tc_admin=1", + f"select Server_name from mysql.servers where Server_name = '{server_name}'", + ], + "force": False, + "bk_cloud_id": cluster.bk_cloud_id, + } + # drs服务远程请求 + res = DRSApi.rpc(rpc_params) + + if res[0]["error_msg"]: + self.log_error(f"select mysql.servers failed: {res[0]['error_msg']}") + return False + + if not res[0]["cmd_results"][1]["table_data"]: + self.log_error(f"Node [{server_name}] no longer has routing information") + return False + + # 添加tdbctl账号 + self._create_tdbctl_user( + cluster=cluster, ctl_primary=ctl_primary, new_ip=new_ip, new_port=new_port, tdbctl_pass=tdbctl_pass + ) + + # 获取待切换的分片信息,拼接alter node语句 + server_name = res[0]["cmd_results"][1]["table_data"][0]["Server_name"] + + # 执行替换节点路由信息 + exec_sql = [ + "set tc_admin=1", + f"TDBCTL ALTER NODE " + f"{server_name} OPTIONS(host '{new_ip}', port {new_port}, password '{tdbctl_pass}', user '{TDBCTL_USER}')", + "TDBCTL FLUSH ROUTING", + ] + rpc_params["cmds"] = exec_sql + res = DRSApi.rpc(rpc_params) + if res[0]["error_msg"]: + self.log_error(f"exec TDBCTL-ALTER-NODE failed: {res[0]['error_msg']}") + return False + + self.log_info(f"exec TDBCTL-ALTER-NODE success: [{server_name}->{new_ip}{IP_PORT_DIVIDER}{new_port}]") + return True + + def _execute(self, data, parent_data): + kwargs = data.get_one_of_inputs("kwargs") + + switch_remote_shard = kwargs["switch_remote_shard"] + + # 获取cluster对象,包括中控实例、 spider端口等 + cluster = Cluster.objects.get(id=kwargs["cluster_id"]) + for pairs in switch_remote_shard: + if not self._alter_remote_slave_routing( + cluster=cluster, + server_name=pairs["server_name"], + new_ip=pairs["new_ip"], + new_port=pairs["new_port"], + tdbctl_pass=pairs["tdbctl_pass"], + ): + return False + + return True + + +class SwitchRemoteShardRoutingComponent(Component): + name = __name__ + code = "switch_remote_shard_routing" + bound_service = SwitchRemoteShardRoutingService diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 1cf7de53b1..e1e467db56 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -1835,14 +1835,13 @@ def tendb_recover_binlog_payload(self, **kwargs): """ MYSQL 实例 前滚binglog """ - # if self.cluster.get("rollback_type", "") == RollbackType.LOCAL_AND_TIME: - # binlog_files = kwargs["trans_data"]["binlog_files"] - # backup_time = kwargs["trans_data"]["backup_time"] - # else: binlog_files = self.cluster["binlog_files"] backup_time = self.cluster["backup_time"] binlog_files_list = binlog_files.split(",") - binlog_start_file = kwargs["trans_data"]["change_master_info"]["master_log_file"] + # binlog_start_file = kwargs["trans_data"]["change_master_info"]["master_log_file"] + # binlog_start_pos = int(kwargs["trans_data"]["change_master_info"]["master_log_pos"]) + binlog_start_file = self.cluster["binlog_start_file"] + binlog_start_pos = int(self.cluster["binlog_start_pos"]) if binlog_start_file not in binlog_files_list: logger.error("start binlog {} not exist".format(binlog_start_file)) raise TendbGetBackupInfoFailedException(message=_("start binlog {} not exist".format(binlog_start_file))) @@ -1874,7 +1873,7 @@ def tendb_recover_binlog_payload(self, **kwargs): "tables": self.cluster["tables"], "databases_ignore": self.cluster["databases_ignore"], "tables_ignore": self.cluster["tables_ignore"], - "start_pos": int(kwargs["trans_data"]["change_master_info"]["master_log_pos"]), + "start_pos": binlog_start_pos, }, "parse_only": False, "binlog_start_file": binlog_start_file, diff --git a/dbm-ui/backend/flow/utils/spider/spider_act_dataclass.py b/dbm-ui/backend/flow/utils/spider/spider_act_dataclass.py index 81e56fd748..1e5bb1c755 100644 --- a/dbm-ui/backend/flow/utils/spider/spider_act_dataclass.py +++ b/dbm-ui/backend/flow/utils/spider/spider_act_dataclass.py @@ -91,3 +91,25 @@ class SwitchRemoteSlaveRoutingKwargs: cluster_id: int switch_remote_instance_pairs: Optional[List[InstancePairs]] + + +@dataclass() +class InstanceServerName: + """ + 定义需要替换的实例信息对 + """ + + server_name: str + new_ip: str + new_port: int + tdbctl_pass: str + + +@dataclass +class SwitchRemoteShardRoutingKwargs: + """ + 定义spider节点remote slave替换操作的私有变量结构体 + """ + + cluster_id: int + switch_remote_shard: Optional[List[InstanceServerName]]