From b5652a8f6357536015f5bdb08a180ad85fc57612 Mon Sep 17 00:00:00 2001 From: zfrendo <842557664@qq.com> Date: Thu, 19 Sep 2024 09:48:17 +0800 Subject: [PATCH] =?UTF-8?q?style:=20tendbHa=E6=95=B0=E6=8D=AE=E6=81=A2?= =?UTF-8?q?=E5=A4=8D=E6=B5=81=E7=A8=8B=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=20#6973?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/cluster/tendbha/switch_storage.py | 2 +- .../mysql/common/slave_recover_switch.py | 2 +- .../mysql_migrate_cluster_remote_flow.py | 58 ++++++++++++-- .../mysql/mysql_restore_slave_remote_flow.py | 75 +++++++++++++++++-- .../collections/mysql/mysql_crond_control.py | 22 +++--- .../flow/utils/mysql/mysql_act_dataclass.py | 3 +- .../ticket/builders/mysql/mysql_add_slave.py | 8 +- .../builders/mysql/mysql_migrate_cluster.py | 8 +- .../mysql/mysql_restore_local_slave.py | 8 +- .../builders/mysql/mysql_restore_slave.py | 8 +- 10 files changed, 137 insertions(+), 57 deletions(-) diff --git a/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_storage.py b/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_storage.py index 65e61f05e1..f18a701f91 100644 --- a/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_storage.py +++ b/dbm-ui/backend/db_meta/api/cluster/tendbha/switch_storage.py @@ -34,7 +34,7 @@ def switch_storage(cluster_id: int, target_storage_ip: str, origin_storage_ip: s ) cluster.storageinstance_set.remove(origin_storage) target_storage.status = InstanceStatus.RUNNING.value - if role and target_storage == InstanceRole.BACKEND_REPEATER: + if role and target_storage.instance_role == InstanceRole.BACKEND_REPEATER.value: # 如果是REPEATER角色,改成传入的role变量 target_storage.instance_role = role target_storage.instance_inner_role = InstanceRoleInstanceInnerRoleMap[role].value diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py index 3c082d7920..eadb3ef073 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/slave_recover_switch.py @@ -151,4 +151,4 @@ def slave_migrate_switch_sub_flow( ) ), ) - return sub_pipeline.build_sub_process(sub_name=_("[{}]成对切换".format(cluster.name))) + return sub_pipeline.build_sub_process(sub_name=_("{}切换到新从库{}:{}".format(cluster.name, new_slave_ip, master.port))) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py index 8f094441ab..329da94339 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py @@ -20,7 +20,8 @@ from backend.components import DBConfigApi from backend.components.dbconfig.constants import FormatType, LevelName from backend.configuration.constants import DBType -from backend.db_meta.enums import ClusterType, InstanceInnerRole +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import ClusterType, InstanceInnerRole, InstanceStatus from backend.db_meta.models import Cluster from backend.db_package.models import Package from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler @@ -33,6 +34,9 @@ ) from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config from backend.flow.engine.bamboo.scene.mysql.common.master_and_slave_switch import master_and_slave_switch +from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import ( + mysql_restore_master_slave_sub_flow, +) from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import remote_instance_migrate_sub_flow @@ -53,6 +57,7 @@ from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.ticket.builders.common.constants import MySQLBackupSource logger = logging.getLogger("flow") @@ -74,6 +79,9 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]): # 定义备份文件存放到目标机器目录位置 self.backup_target_path = f"/data/dbbak/{self.root_id}" + self.local_backup = False + if self.ticket_data.get("backup_source") == MySQLBackupSource.LOCAL: + self.local_backup = True def migrate_cluster_flow(self, use_for_upgrade=False): """ @@ -234,18 +242,42 @@ def migrate_cluster_flow(self, use_for_upgrade=False): cluster["slave_ip"] = self.data["slave_ip"] cluster["master_port"] = master_model.port cluster["slave_port"] = master_model.port + cluster["mysql_port"] = master_model.port cluster["file_target_path"] = f"/data/dbbak/{self.root_id}/{master_model.port}" cluster["cluster_id"] = cluster_model.id cluster["bk_cloud_id"] = cluster_model.bk_cloud_id cluster["change_master_force"] = False + cluster["change_master"] = False cluster["charset"] = self.data["charset"] sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) - sync_data_sub_pipeline.add_sub_pipeline( - sub_flow=remote_instance_migrate_sub_flow( - root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster + if self.local_backup: + stand_by_slaves = cluster_model.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, + is_stand_by=True, + status=InstanceStatus.RUNNING.value, + ).exclude(machine__ip__in=[self.data["new_slave_ip"], self.data["new_master_ip"]]) + # 从standby从库找备份 + inst_list = ["{}{}{}".format(master_model.machine.ip, IP_PORT_DIVIDER, master_model.port)] + if len(stand_by_slaves) > 0: + inst_list.append( + "{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port) + ) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_master_slave_sub_flow( + root_id=self.root_id, + ticket_data=copy.deepcopy(self.data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, + ) + ) + else: + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=remote_instance_migrate_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster + ) ) - ) sync_data_sub_pipeline.add_act( act_name=_("数据恢复完毕,写入新主节点和旧主节点的关系链元数据"), @@ -297,7 +329,7 @@ def migrate_cluster_flow(self, use_for_upgrade=False): ) ) switch_sub_pipeline.add_act( - act_name=_("集群切换完成,写入 {} 的元信息".format(cluster_model.id)), + act_name=_("集群切换完成,写入 {} 的元信息".format(cluster_model.name)), act_component_code=MySQLDBMetaComponent.code, kwargs=asdict( DBMetaOPKwargs( @@ -308,7 +340,7 @@ def migrate_cluster_flow(self, use_for_upgrade=False): ), ) switch_sub_pipeline_list.append( - switch_sub_pipeline.build_sub_process(sub_name=_("集群 {} 切换".format(cluster_model.id))) + switch_sub_pipeline.build_sub_process(sub_name=_("集群 {} 切换".format(cluster_model.name))) ) # 第四步 卸载实例 uninstall_svr_sub_pipeline_list = [] @@ -405,6 +437,18 @@ def migrate_cluster_flow(self, use_for_upgrade=False): db_backup_pkg_type=MysqlVersionToDBBackupForMap[self.data["db_version"]], ) ) + # tendb_migrate_pipeline.add_act( + # act_name=_("屏蔽监控 {} {}").format(self.data["new_master_ip"], self.data["new_slave_ip"]), + # act_component_code=MysqlCrondMonitorControlComponent.code, + # kwargs=asdict( + # CrondMonitorKwargs( + # bk_cloud_id=cluster_class.bk_cloud_id, + # exec_ips=[self.data["new_master_ip"], self.data["new_slave_ip"]], + # port=0, + # minutes=240, + # ) + # ), + # ) # 人工确认切换迁移实例 tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) # 切换迁移实例 diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_restore_slave_remote_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_restore_slave_remote_flow.py index 92d2fa4c1f..569900870f 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_restore_slave_remote_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_restore_slave_remote_flow.py @@ -29,6 +29,7 @@ install_mysql_in_cluster_sub_flow, ) from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config +from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow @@ -57,6 +58,7 @@ from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.ticket.builders.common.constants import MySQLBackupSource logger = logging.getLogger("flow") @@ -76,6 +78,9 @@ def __init__(self, root_id: str, tick_data: Optional[Dict]): self.data = {} # 仅添加从库。不切换。不复制账号 self.add_slave_only = self.ticket_data.get("add_slave_only", False) + self.local_backup = False + if self.ticket_data.get("backup_source") == MySQLBackupSource.LOCAL: + self.local_backup = True def tendb_ha_restore_slave_flow(self): """ @@ -200,13 +205,38 @@ def tendb_ha_restore_slave_flow(self): "file_target_path": f"/data/dbbak/{self.root_id}/{master.port}", "charset": self.data["charset"], "change_master_force": True, + "change_master": True, } sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) - sync_data_sub_pipeline.add_sub_pipeline( - sub_flow=slave_recover_sub_flow( - root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster + if self.local_backup: + # 获取本地备份并恢复 + inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)] + stand_by_slaves = cluster_model.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, + is_stand_by=True, + status=InstanceStatus.RUNNING.value, + ).exclude(machine__ip__in=[self.data["new_slave_ip"]]) + if len(stand_by_slaves) > 0: + inst_list.append( + "{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port) + ) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_data_sub_flow( + root_id=self.root_id, + ticket_data=copy.deepcopy(self.data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, + ) ) - ) + + else: + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=slave_recover_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster + ) + ) + sync_data_sub_pipeline.add_act( act_name=_("同步完毕,写入主从关系,设置节点为running状态"), act_component_code=MySQLDBMetaComponent.code, @@ -218,6 +248,7 @@ def tendb_ha_restore_slave_flow(self): ) ), ) + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) switch_sub_pipeline_list = [] @@ -319,6 +350,19 @@ def tendb_ha_restore_slave_flow(self): cluster_type=ClusterType.TenDBHA.value, ) ) + # tendb_migrate_pipeline.add_act( + # act_name=_("屏蔽监控 {}").format(self.data["new_slave_ip"]), + # act_component_code=MysqlCrondMonitorControlComponent.code, + # kwargs=asdict( + # CrondMonitorKwargs( + # bk_cloud_id=cluster_class.bk_cloud_id, + # exec_ips=[self.data["new_slave_ip"]], + # port=0, + # minutes=240, + # ) + # ), + # ) + if not self.add_slave_only: # 人工确认切换迁移实例 tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) @@ -473,12 +517,27 @@ def restore_local_slave_flow(self): "charset": self.data["charset"], "change_master_force": True, "cluster_type": cluster_model.cluster_type, + "change_master": True, } - tendb_migrate_pipeline.add_sub_pipeline( - sub_flow=slave_recover_sub_flow( - root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster + + if self.local_backup: + inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)] + tendb_migrate_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_data_sub_flow( + root_id=self.root_id, + ticket_data=copy.deepcopy(self.data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, + ) ) - ) + else: + tendb_migrate_pipeline.add_sub_pipeline( + sub_flow=slave_recover_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster + ) + ) + # 卸载流程人工确认 tendb_migrate_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={}) # 克隆权限 diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/mysql_crond_control.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/mysql_crond_control.py index 138f53daa8..4b54c9e426 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mysql/mysql_crond_control.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/mysql_crond_control.py @@ -44,22 +44,22 @@ def _execute(self, data, parent_data) -> bool: self.log_info("{} exec {}".format(target_ip_info, node_name)) FlowNode.objects.filter(root_id=root_id, node_id=node_id).update(hosts=exec_ips) + cmd_str = "cd /home/mysql/mysql-crond && ./mysql-crond " if kwargs["enable"]: - monitor_command = ( - "cd /home/mysql/mysql-crond && ./mysql-crond enable-job --name-match mysql-monitor-{}-.*".format( - kwargs["port"] - ) - ) + cmd_str += " enable-job" else: - monitor_command = ( - "cd /home/mysql/mysql-crond && ./mysql-crond pause-job --name-match mysql-monitor-{}-.* -r {}m".format( - kwargs["port"], kwargs["minutes"] - ) - ) + cmd_str += " pause-job -r {}m".format(kwargs["minutes"]) + if kwargs["port"] == 0: + cmd_str += " --name-match mysql-monitor-.*" + else: + cmd_str += " --name-match mysql-monitor-{}-.*".format(kwargs["port"]) + if kwargs["name"] != "": + cmd_str += " --name {}".format(kwargs["name"]) + self.log_info(cmd_str) body = { "bk_biz_id": env.JOB_BLUEKING_BIZ_ID, "task_name": f"DBM_{node_name}_{node_id}", - "script_content": base64_encode(monitor_command), + "script_content": base64_encode(cmd_str), "script_language": 1, "target_server": {"ip_list": target_ip_info}, } diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py index 1ab3a483cd..ab15a3df10 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py @@ -402,7 +402,8 @@ class CrondMonitorKwargs: bk_cloud_id: int exec_ips: list - port: int + name: str = "" + port: int = 0 minutes: int = 1440 enable: bool = False diff --git a/dbm-ui/backend/ticket/builders/mysql/mysql_add_slave.py b/dbm-ui/backend/ticket/builders/mysql/mysql_add_slave.py index aee33a3b05..373fed42d7 100644 --- a/dbm-ui/backend/ticket/builders/mysql/mysql_add_slave.py +++ b/dbm-ui/backend/ticket/builders/mysql/mysql_add_slave.py @@ -52,13 +52,7 @@ def validate(self, attrs): class MysqlAddSlaveParamBuilder(builders.FlowParamBuilder): # 复用重建 slave 的场景 - controller_remote = MySQLController.mysql_add_slave_remote_scene - controller_local = MySQLController.mysql_add_slave_scene - - def build_controller_info(self) -> dict: - backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL) - self.controller = getattr(self, f"controller_{backup_source}") - return super().build_controller_info() + controller = MySQLController.mysql_add_slave_remote_scene def format_ticket_data(self): self.ticket_data["add_slave_only"] = True diff --git a/dbm-ui/backend/ticket/builders/mysql/mysql_migrate_cluster.py b/dbm-ui/backend/ticket/builders/mysql/mysql_migrate_cluster.py index fc18d390eb..e37d7d16b4 100644 --- a/dbm-ui/backend/ticket/builders/mysql/mysql_migrate_cluster.py +++ b/dbm-ui/backend/ticket/builders/mysql/mysql_migrate_cluster.py @@ -55,13 +55,7 @@ def validate(self, attrs): class MysqlMigrateClusterParamBuilder(builders.FlowParamBuilder): - controller_remote = MySQLController.mysql_migrate_remote_scene - controller_local = MySQLController.mysql_migrate_cluster_scene - - def build_controller_info(self) -> dict: - backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL) - self.controller = getattr(self, f"controller_{backup_source}") - return super().build_controller_info() + controller = MySQLController.mysql_migrate_remote_scene def format_ticket_data(self): if self.ticket_data["ip_source"] == IpSource.RESOURCE_POOL: diff --git a/dbm-ui/backend/ticket/builders/mysql/mysql_restore_local_slave.py b/dbm-ui/backend/ticket/builders/mysql/mysql_restore_local_slave.py index d41db9a208..2ea4b6b40e 100644 --- a/dbm-ui/backend/ticket/builders/mysql/mysql_restore_local_slave.py +++ b/dbm-ui/backend/ticket/builders/mysql/mysql_restore_local_slave.py @@ -49,13 +49,7 @@ def validate(self, attrs): class MysqlRestoreLocalSlaveParamBuilder(builders.FlowParamBuilder): - controller_remote = MySQLController.mysql_restore_local_remote_scene - controller_local = MySQLController.mysql_restore_local_slave_scene - - def build_controller_info(self) -> dict: - backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL) - self.controller = getattr(self, f"controller_{backup_source}") - return super().build_controller_info() + controller = MySQLController.mysql_restore_local_remote_scene def format_ticket_data(self): for index, info in enumerate(self.ticket_data["infos"]): diff --git a/dbm-ui/backend/ticket/builders/mysql/mysql_restore_slave.py b/dbm-ui/backend/ticket/builders/mysql/mysql_restore_slave.py index e61113ae61..568d483a43 100644 --- a/dbm-ui/backend/ticket/builders/mysql/mysql_restore_slave.py +++ b/dbm-ui/backend/ticket/builders/mysql/mysql_restore_slave.py @@ -54,13 +54,7 @@ def validate(self, attrs): class MysqlRestoreSlaveParamBuilder(builders.FlowParamBuilder): - controller_remote = MySQLController.mysql_restore_slave_remote_scene - controller_local = MySQLController.mysql_restore_slave_scene - - def build_controller_info(self) -> dict: - backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL) - self.controller = getattr(self, f"controller_{backup_source}") - return super().build_controller_info() + controller = MySQLController.mysql_restore_slave_remote_scene def format_ticket_data(self): self.ticket_data["add_slave_only"] = False