From 7a3e320e6770955cb5c053b21b9a842ca6ff115c Mon Sep 17 00:00:00 2001 From: yuanruji Date: Thu, 10 Oct 2024 17:56:16 +0800 Subject: [PATCH] =?UTF-8?q?feat(dbm-services):=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E4=B8=80=E4=B8=BB=E5=A4=9A=E4=BB=8E=E6=B5=81=E7=A8=8B=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E9=80=82=E9=85=8Dsaas=E3=80=81=E5=89=8D=E7=AB=AF=20#7?= =?UTF-8?q?234?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mysql/common/master_and_slave_switch.py | 13 +- ..._slaves_upgrade.py => mysql_ha_upgrade.py} | 539 +++++++++++++++++- .../backend/flow/engine/controller/mysql.py | 19 +- dbm-ui/backend/flow/urls.py | 2 + .../backend/flow/utils/mysql/mysql_db_meta.py | 43 +- dbm-ui/backend/flow/views/mysql_upgrade.py | 15 + 6 files changed, 585 insertions(+), 46 deletions(-) rename dbm-ui/backend/flow/engine/bamboo/scene/mysql/{mysql_non_standby_slaves_upgrade.py => mysql_ha_upgrade.py} (51%) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py index 474a0e577e..5a1373e463 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py @@ -148,9 +148,14 @@ def master_and_slave_switch(root_id: str, ticket_data: dict, cluster: Cluster, c mysql_storage_slave = cluster.storageinstance_set.filter( instance_inner_role=InstanceInnerRole.SLAVE.value, status=InstanceStatus.RUNNING.value ) - cluster_info["other_slave_info"] = [ - y.machine.ip for y in mysql_storage_slave.exclude(machine__ip=cluster_info["old_slave_ip"]) - ] + exclude_ips = [cluster_info["old_slave_ip"]] + if cluster_info.get("old_ro_slave_ips"): + exclude_ips.extend(cluster_info["old_ro_slave_ips"]) + logger.info(_("exclude_ips ip list {}").format(exclude_ips)) + cluster_info["other_slave_info"] = [y.machine.ip for y in mysql_storage_slave.exclude(machine__ip__in=exclude_ips)] + logger.info(_("other_slave_info:{}").format(cluster_info["other_slave_info"])) + if cluster_info.get("new_ro_slave_ips"): + cluster_info["other_slave_info"].extend(cluster_info["new_ro_slave_ips"]) domain_map = get_tendb_ha_entry(cluster.id) cluster_info["master_domain"] = domain_map["master_domain"] cluster_info["slave_domain"] = domain_map["slave_domain"] @@ -171,7 +176,7 @@ def master_and_slave_switch(root_id: str, ticket_data: dict, cluster: Cluster, c if cluster_info["other_slave_info"]: # 如果集群存在其他slave节点,则建立新的你主从关系 acts_list = [] - for exec_ip in cluster_info["other_slave_info"]: + for exec_ip in list(set(cluster_info["other_slave_info"])): cluster_sw_kwargs.exec_ip = exec_ip cluster_sw_kwargs.get_mysql_payload_func = MysqlActPayload.get_change_master_payload.__name__ acts_list.append( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py similarity index 51% rename from dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py rename to dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py index 7257e5d971..bc2f84dfc8 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py @@ -11,8 +11,10 @@ import copy import logging.config from dataclasses import asdict +from datetime import datetime from typing import Dict, Optional +from django.utils import timezone from django.utils.translation import gettext as _ from backend.configuration.constants import DBType @@ -21,17 +23,24 @@ from backend.db_meta.exceptions import DBMetaException from backend.db_meta.models import Cluster, StorageInstance from backend.db_package.models import Package +from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler from backend.flow.consts import MediumEnum from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_tendb_ha_entry from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import install_mysql_in_cluster_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config -from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.master_and_slave_switch import master_and_slave_switch +from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import ( + mysql_restore_data_sub_flow, + mysql_restore_master_slave_sub_flow, +) from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow from backend.flow.engine.bamboo.scene.mysql.mysql_upgrade import upgrade_version_check +from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import remote_instance_migrate_sub_flow from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent from backend.flow.plugins.components.collections.common.pause import PauseComponent from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent @@ -143,7 +152,7 @@ def destroy(self): p.run_pipeline(is_drop_random_user=False) -class MySQLNonStandbySlavesUpgradeFlow(object): +class TendbClusterUpgradeFlow(object): """ 一直多从非stanby slaves升级 """ @@ -183,7 +192,40 @@ def __precheck(self): ) upgrade_version_check(origin_mysql_ver, new_mysql_ver) - def upgrade(self): + def upgrade_ro_slaves(self): + self.__precheck() + cluster_ids = [] + for info in self.ticket_data["infos"]: + cluster_ids.extend(info["cluster_ids"]) + + p = Builder( + root_id=self.root_id, + data=copy.deepcopy(self.ticket_data), + need_random_pass_cluster_ids=list(set(cluster_ids)), + ) + subflows = [] + created_by = self.ticket_data["ticket_type"] + for info in self.ticket_data["infos"]: + subflow = non_standby_slaves_upgrade_subflow( + uid=str(self.ticket_data["uid"]), + root_id=self.root_id, + new_slave=info["new_slave"], + old_slave=info["old_slave"], + add_slave_only=self.add_slave_only, + relation_cluster_ids=info["cluster_ids"], + pkg_id=info["pkg_id"], + new_db_module_id=info["new_db_module_id"], + backup_source=self.ticket_data["backup_source"], + created_by=created_by, + force_uninstall=False, + ) + subflows.append(subflow) + + p.add_parallel_sub_pipeline(subflows) + + p.run_pipeline(init_trans_data_class=ClusterInfoContext(), is_drop_random_user=True) + + def upgrade_tendbha_cluster(self): """ { "uid": "2022051612120001", @@ -196,19 +238,35 @@ def upgrade(self): "cluster_ids": [1001,1002], "pkg_id": 123, "new_db_module_id: "578", - "old_slave": { + "new_slave": { + "ip": "1.1.2.1", "bk_biz_id": 200005000, - "bk_cloud_id": 0, "bk_host_id": 1, - "ip": "1.1.1.1", - + "bk_cloud_id": 0 }, - "new_slave": { + "new_master": { + "ip": "1.1.3.1", "bk_biz_id": 200005000, - "bk_cloud_id": 0, "bk_host_id": 1, - "ip": "1.1.1.2" - } + "bk_cloud_id": 0 + }, + ro_slaves:[ + { + "old_slave": { + "bk_biz_id": 200005000, + "bk_cloud_id": 0, + "bk_host_id": 1, + "ip": "1.1.1.1", + + }, + "new_slave": { + "bk_biz_id": 200005000, + "bk_cloud_id": 0, + "bk_host_id": 1, + "ip": "1.1.1.2" + } + } + ] } ] } @@ -224,20 +282,21 @@ def upgrade(self): need_random_pass_cluster_ids=list(set(cluster_ids)), ) subflows = [] - created_by = self.ticket_data["ticket_type"] + created_by = self.ticket_data["created_by"] for info in self.ticket_data["infos"]: - subflow = non_standby_slaves_upgrade_subflow( + subflow = tendbha_cluster_upgrade_subflow( uid=str(self.ticket_data["uid"]), root_id=self.root_id, + new_master=info["new_master"], new_slave=info["new_slave"], - old_slave=info["old_slave"], - add_slave_only=self.add_slave_only, - relation_cluster_ids=info["cluster_ids"], + ro_slaves=info["ro_slaves"], + cluster_ids=info["cluster_ids"], pkg_id=info["pkg_id"], new_db_module_id=info["new_db_module_id"], backup_source=self.ticket_data["backup_source"], created_by=created_by, force_uninstall=False, + ticket_type=self.ticket_data["ticket_type"], ) subflows.append(subflow) @@ -246,6 +305,215 @@ def upgrade(self): p.run_pipeline(init_trans_data_class=ClusterInfoContext(), is_drop_random_user=True) +def tendbha_cluster_upgrade_subflow( + uid: str, + root_id: str, + new_slave: dict, + new_master: dict, + ro_slaves: list, + cluster_ids: list, + pkg_id: int, + new_db_module_id: int, + backup_source: str, + created_by: str, + force_uninstall: bool, + ticket_type: str, +): + """ + 一主多从,整个集群升级 + """ + cluster_cls = Cluster.objects.get(id=cluster_ids[0]) + ports = get_ports(cluster_ids) + pkg = Package.objects.get(id=pkg_id, pkg_type=MediumEnum.MySQL, db_type=DBType.MySQL) + charset, db_version = get_version_and_charset( + cluster_cls.bk_biz_id, db_module_id=new_db_module_id, cluster_type=cluster_cls.cluster_type + ) + # 确定要迁移的主节点,从节点. + master_model = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + slave = cluster_cls.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True + ).first() + old_master_ip = master_model.machine.ip + old_slave_ip = slave.machine.ip + parent_global_data = { + "uid": uid, + "root_id": root_id, + "bk_biz_id": cluster_cls.bk_biz_id, + "bk_cloud_id": cluster_cls.bk_cloud_id, + "db_module_id": new_db_module_id, + "time_zone": cluster_cls.time_zone, + "cluster_type": cluster_cls.cluster_type, + "created_by": created_by, + "cluster_ids": cluster_ids, + "package": pkg.name, + "master_ip": old_master_ip, + "old_slave_ip": old_slave_ip, + "ports": ports, + "charset": charset, + "db_version": db_version, + "force": force_uninstall, + "ticket_type": ticket_type, + } + sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) + old_ro_slave_ips = [] + new_ro_slave_ips = [] + if len(ro_slaves) > 0: + ro_sub_piplelines = [] + ro_switch_ro_sub_piplelines = [] + for ro_slave in ro_slaves: + ro_sub_pipleline = SubBuilder(root_id=root_id, data=parent_global_data) + old_ro_slave = ro_slave["old_ro_slave"] + new_ro_slave = ro_slave["new_ro_slave"] + new_ro_slave_ip = new_ro_slave["ip"] + new_ro_slave_ips.append(new_ro_slave_ip) + bk_host_ids = [new_slave["bk_host_id"]] + old_ro_slave_ip = old_ro_slave["ip"] + old_ro_slave_ips.append(old_ro_slave_ip) + db_config = get_instance_config(cluster_cls.bk_cloud_id, old_ro_slave_ip, ports=ports) + install_ro_slave_sub_pipeline = build_install_slave_sub_pipeline( + uid, + root_id, + parent_global_data, + cluster_cls, + new_ro_slave_ip, + ports, + bk_host_ids, + db_config, + pkg_id, + pkg.name, + cluster_ids, + new_db_module_id, + ) + ro_sub_pipleline.add_sub_pipeline(sub_flow=install_ro_slave_sub_pipeline) + # 恢复主从数据 + local_backup = False + if backup_source == MySQLBackupSource.LOCAL: + local_backup = True + sync_data_sub_pipeline_list = build_sync_data_sub_pipelines( + root_id, parent_global_data, cluster_ids, new_ro_slave_ip, local_backup, charset + ) + ro_sub_pipleline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list) + ro_sub_piplelines.append(ro_sub_pipleline.build_sub_process(sub_name=_("安装非stanbySlave节点并数据同步"))) + # 切换换subpipeline + ro_switch_ro_sub_pipleline = SubBuilder(root_id=root_id, data=parent_global_data) + switch_sub_pipeline_list = build_switch_sub_pipelines( + root_id, parent_global_data, cluster_ids, old_ro_slave_ip, new_ro_slave_ip + ) + ro_switch_ro_sub_pipleline.add_parallel_sub_pipeline(switch_sub_pipeline_list) + ro_switch_ro_sub_pipleline.add_act( + act_name=_("更新[NewSlave]{} db module id".format(new_ro_slave_ip)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.update_upgrade_slaves_dbmodule.__name__, + is_update_trans_data=True, + cluster={ + "db_module_id": new_db_module_id, + "new_slave_ip": new_ro_slave_ip, + }, + ) + ), + ) + + # 解除old从节点和集群的元数据的关系 + ro_switch_ro_sub_pipleline.add_act( + act_name=_("解除[OldSlave]{}相关从实例和集群的元数据的关系".format(old_ro_slave_ip)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.dissolve_master_slave_relationship.__name__, + is_update_trans_data=True, + cluster={ + "cluster_ids": cluster_ids, + "old_slave_ip": old_ro_slave_ip, + }, + ) + ), + ) + ro_switch_ro_sub_piplelines.append(ro_switch_ro_sub_pipleline.build_sub_process(sub_name=_("切换RO从节点"))) + # 安装mysql + ms_sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) + bk_host_ids = [new_master["bk_host_id"], new_slave["bk_host_id"]] + master = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + db_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports) + install_ms_pair_subflow = build_install_ms_pair_sub_pipeline( + uid=uid, + root_id=root_id, + parent_global_data=parent_global_data, + cluster=cluster_cls, + new_master_ip=new_master["ip"], + new_slave_ip=new_slave["ip"], + ports=ports, + bk_host_ids=bk_host_ids, + pkg_id=pkg.id, + db_config=db_config, + db_module_id=new_db_module_id, + ) + ms_sub_pipeline.add_sub_pipeline(sub_flow=install_ms_pair_subflow) + new_master_ip = new_master["ip"] + new_slave_ip = new_slave["ip"] + sync_data_sub_pipeline_list = build_ms_pair_sync_data_sub_pipelines( + root_id, parent_global_data, cluster_ids, new_master_ip, new_slave_ip, local_backup, charset + ) + ms_sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) + ms_process = ms_sub_pipeline.build_sub_process(sub_name=_("安装主从节点,并同步数据")) + if len(ro_slaves) > 0: + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=[ms_process] + ro_sub_piplelines) + else: + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=[ms_process]) + # 切换主从 + sub_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) + # 先切ro slaves + if len(ro_slaves) > 0: + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=ro_switch_ro_sub_piplelines) + logger.info(_("old_ro_slave ip list {}").format(old_ro_slave_ips)) + ms_switch_subflows = build_ms_pair_switch_sub_pipelines( + uid=uid, + root_id=root_id, + parent_global_data=parent_global_data, + relation_cluster_ids=cluster_ids, + old_master_ip=old_master_ip, + old_slave_ip=old_slave_ip, + new_master_ip=new_master_ip, + new_slave_ip=new_slave_ip, + old_ro_slave_ips=old_ro_slave_ips, + new_ro_slave_ips=new_ro_slave_ips, + ro_slaves=ro_slaves, + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=ms_switch_subflows) + # 更新集群模块信息 + sub_pipeline.add_act( + act_name=_("更新集群db模块信息"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.update_cluster_module.__name__, + cluster={ + "cluster_ids": cluster_ids, + "new_module_id": new_db_module_id, + "major_version": db_version, + }, + ) + ), + ) + sub_pipeline.add_act(act_name=_("人工确认下架旧节点"), act_component_code=PauseComponent.code, kwargs={}) + uninstall_flows = [] + uninstall_ip_list = [old_master_ip, old_slave_ip] + old_ro_slave_ips + for uninstall_ip in uninstall_ip_list: + uninstall_flows.append( + build_uninstall_old_machine_sub_pipeline( + root_id=root_id, + parent_global_data=parent_global_data, + uninstall_ip=uninstall_ip, + relation_cluster_ids=cluster_ids, + bk_cloud_id=cluster_cls.bk_cloud_id, + ports=ports, + ) + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=uninstall_flows) + return sub_pipeline.build_sub_process(sub_name=_("{}:整体迁移升级").format(cluster_cls.immute_domain)) + + def non_standby_slaves_upgrade_subflow( uid: str, root_id: str, @@ -291,7 +559,7 @@ def non_standby_slaves_upgrade_subflow( db_config = get_instance_config(cluster_cls.bk_cloud_id, old_slave_ip, ports=ports) # 安装mysql - install_sub_pipeline = build_install_sub_pipeline( + install_sub_pipeline = build_install_slave_sub_pipeline( uid, root_id, parent_global_data, @@ -357,7 +625,7 @@ def non_standby_slaves_upgrade_subflow( # 切换完成后,确认卸载旧的从节点 sub_pipeline.add_act(act_name=_("确认卸载旧实例"), act_component_code=PauseComponent.code, kwargs={}) # 卸载旧从节点 - uninstall_svr_sub_pipeline = build_uninstall_sub_pipeline( + uninstall_svr_sub_pipeline = build_uninstall_old_machine_sub_pipeline( root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster_cls.bk_cloud_id, ports ) sub_pipeline.add_sub_pipeline(sub_flow=uninstall_svr_sub_pipeline) @@ -365,7 +633,7 @@ def non_standby_slaves_upgrade_subflow( return sub_pipeline.build_sub_process(sub_name=_("{}:slave迁移升级到:{}").format(old_slave_ip, new_slave_ip)) -def build_install_sub_pipeline( +def build_install_slave_sub_pipeline( uid, root_id, parent_global_data, @@ -546,16 +814,18 @@ def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids return switch_sub_pipeline_list -def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, relation_cluster_ids, bk_cloud_id, ports): +def build_uninstall_old_machine_sub_pipeline( + root_id, parent_global_data, uninstall_ip, relation_cluster_ids, bk_cloud_id, ports +): uninstall_svr_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) - cluster_info = {"uninstall_ip": old_slave_ip, "cluster_ids": relation_cluster_ids} + cluster_info = {"uninstall_ip": uninstall_ip, "cluster_ids": relation_cluster_ids} uninstall_svr_sub_pipeline.add_act( act_name=_("卸载实例前先删除元数据"), act_component_code=MySQLDBMetaComponent.code, kwargs=asdict( DBMetaOPKwargs( - db_meta_class_func=MySQLDBMeta.del_old_slave_meta.__name__, + db_meta_class_func=MySQLDBMeta.del_cluster_old_machine_meta.__name__, is_update_trans_data=True, cluster=cluster_info, ) @@ -563,12 +833,12 @@ def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, rela ) uninstall_svr_sub_pipeline.add_act( - act_name=_("下发db-actor到节点{}").format(old_slave_ip), + act_name=_("下发db-actor到节点{}").format(uninstall_ip), act_component_code=TransFileComponent.code, kwargs=asdict( DownloadMediaKwargs( bk_cloud_id=bk_cloud_id, - exec_ip=old_slave_ip, + exec_ip=uninstall_ip, file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), ) ), @@ -579,7 +849,7 @@ def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, rela act_component_code=MySQLClearMachineComponent.code, kwargs=asdict( ClearMachineKwargs( - exec_ip=old_slave_ip, + exec_ip=uninstall_ip, bk_cloud_id=bk_cloud_id, ) ), @@ -589,9 +859,224 @@ def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, rela sub_flow=uninstall_instance_sub_flow( root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), - ip=old_slave_ip, + ip=uninstall_ip, ports=ports, ) ) - return uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}").format(old_slave_ip)) + return uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}").format(uninstall_ip)) + + +def build_install_ms_pair_sub_pipeline( + uid, + root_id, + parent_global_data, + cluster, + new_master_ip, + new_slave_ip, + ports, + bk_host_ids, + db_config, + pkg_id, + db_module_id, +): + install_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + + install_sub_pipeline.add_sub_pipeline( + sub_flow=install_mysql_in_cluster_sub_flow( + uid=uid, + root_id=root_id, + cluster=cluster, + new_mysql_list=[new_master_ip, new_slave_ip], + install_ports=ports, + bk_host_ids=bk_host_ids, + pkg_id=pkg_id, + db_config=db_config, + db_module_id=str(db_module_id), + ) + ) + + cluster_info = { + "cluster_ports": ports, + "new_master_ip": new_master_ip, + "new_slave_ip": new_slave_ip, + "bk_cloud_id": cluster.bk_cloud_id, + } + + install_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.migrate_cluster_add_instance.__name__, + cluster=copy.deepcopy(cluster_info), + is_update_trans_data=True, + ) + ), + ) + + install_sub_pipeline.add_act( + act_name=_("安装backup-client工具"), + act_component_code=DownloadBackupClientComponent.code, + kwargs=asdict( + DownloadBackupClientKwargs( + bk_cloud_id=cluster.bk_cloud_id, + bk_biz_id=int(cluster.bk_biz_id), + download_host_list=[new_master_ip, new_slave_ip], + ) + ), + ) + + exec_act_kwargs = ExecActuatorKwargs( + cluster=cluster_info, + bk_cloud_id=cluster.bk_cloud_id, + cluster_type=cluster.cluster_type, + get_mysql_payload_func=MysqlActPayload.get_install_tmp_db_backup_payload.__name__, + exec_ip=[new_master_ip, new_slave_ip], + ) + install_sub_pipeline.add_act( + act_name=_("安装临时备份程序"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + return install_sub_pipeline.build_sub_process(sub_name=_("安装MySQL主从实例")) + + +def build_ms_pair_sync_data_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, new_master_ip, new_slave_ip, local_backup: bool, charset: str +): + sync_data_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + master_model = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + rollback_time = datetime.now(timezone.utc) + rollback_handler = FixPointRollbackHandler(cluster_id=cluster_model.id) + backup_info = rollback_handler.query_latest_backup_log(rollback_time) + if backup_info is None: + logger.error("cluster {} backup info not exists".format(cluster_model.id)) + raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_id))) + cluster = { + "backupinfo": backup_info, + "new_master_ip": new_master_ip, + "new_slave_ip": new_slave_ip, + "new_master_port": master_model.port, + "new_slave_port": master_model.port, + "cluster_type": cluster_model.cluster_type, + "master_ip": master_model.machine.ip, + "slave_ip": "", + "master_port": master_model.port, + "slave_port": "", + "mysql_port": "", + "file_target_path": f"/data/dbbak/{root_id}/{master_model.port}", + "cluster_id": cluster_model.id, + "bk_cloud_id": cluster_model.bk_cloud_id, + "charset": charset, + "change_master_force": True, + "change_master": True, + } + sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + if local_backup: + stand_by_slaves = cluster_model.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, + is_stand_by=True, + status=InstanceStatus.RUNNING.value, + ).exclude(machine__ip__in=[new_master_ip, new_slave_ip]) + # 从standby从库找备份 + inst_list = ["{}{}{}".format(master_model.machine.ip, IP_PORT_DIVIDER, master_model.port)] + if len(stand_by_slaves) > 0: + inst_list.append( + "{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port) + ) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_master_slave_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, + ) + ) + else: + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=remote_instance_migrate_sub_flow( + root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster + ) + ) + sync_data_sub_pipeline.add_act( + act_name=_("同步完毕,写入主从关系,设置节点为running状态"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, + cluster=cluster, + is_update_trans_data=True, + ) + ), + ) + sync_data_sub_pipeline_list.append( + sync_data_sub_pipeline.build_sub_process(sub_name=_("{}:恢复实例数据").format(cluster_model.immute_domain)) + ) + return sync_data_sub_pipeline_list + + +def build_ms_pair_switch_sub_pipelines( + uid: str, + root_id: str, + parent_global_data: dict, + relation_cluster_ids: list, + old_master_ip: str, + old_slave_ip: str, + new_master_ip: str, + new_slave_ip: str, + old_ro_slave_ips: list, + new_ro_slave_ips: list, + ro_slaves: dict, +): + switch_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + switch_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + cluster_model = Cluster.objects.get(id=cluster_id) + master_model = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + cluster_info = { + "cluster_id": cluster_model.id, + "bk_cloud_id": cluster_model.bk_cloud_id, + "old_master_ip": old_master_ip, + "old_master_port": master_model.port, + "old_slave_ip": old_slave_ip, + "old_slave_port": master_model.port, + "new_master_ip": new_master_ip, + "new_master_port": master_model.port, + "new_slave_ip": new_slave_ip, + "new_slave_port": master_model.port, + "mysql_port": master_model.port, + "master_port": master_model.port, + "old_ro_slave_ips": old_ro_slave_ips, + "new_ro_slave_ips": new_ro_slave_ips, + "ro_slaves": ro_slaves, + } + switch_sub_pipeline.add_sub_pipeline( + sub_flow=master_and_slave_switch( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster_model, + cluster_info=copy.deepcopy(cluster_info), + ) + ) + switch_sub_pipeline.add_act( + act_name=_("集群切换完成,写入 {} 的元信息".format(cluster_model.name)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_migrate_cluster_switch_storage.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + switch_sub_pipeline_list.append( + switch_sub_pipeline.build_sub_process( + sub_name=_("集群切换到新主从节点:new-master:{},new-slave:{}").format(new_master_ip, new_slave_ip) + ) + ) + return switch_sub_pipeline_list diff --git a/dbm-ui/backend/flow/engine/controller/mysql.py b/dbm-ui/backend/flow/engine/controller/mysql.py index 9dd3af8b0b..631e6da4e2 100644 --- a/dbm-ui/backend/flow/engine/controller/mysql.py +++ b/dbm-ui/backend/flow/engine/controller/mysql.py @@ -30,14 +30,14 @@ from backend.flow.engine.bamboo.scene.mysql.mysql_ha_full_backup_flow import MySQLHAFullBackupFlow from backend.flow.engine.bamboo.scene.mysql.mysql_ha_metadata_import import TenDBHAMetadataImportFlow from backend.flow.engine.bamboo.scene.mysql.mysql_ha_standardize_flow import MySQLHAStandardizeFlow +from backend.flow.engine.bamboo.scene.mysql.mysql_ha_upgrade import ( + DestroyNonStanbySlaveMySQLFlow, + TendbClusterUpgradeFlow, +) from backend.flow.engine.bamboo.scene.mysql.mysql_master_fail_over import MySQLMasterFailOverFlow from backend.flow.engine.bamboo.scene.mysql.mysql_master_slave_switch import MySQLMasterSlaveSwitchFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_flow import MySQLMigrateClusterFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_remote_flow import MySQLMigrateClusterRemoteFlow -from backend.flow.engine.bamboo.scene.mysql.mysql_non_standby_slaves_upgrade import ( - DestroyNonStanbySlaveMySQLFlow, - MySQLNonStandbySlavesUpgradeFlow, -) from backend.flow.engine.bamboo.scene.mysql.mysql_open_area_flow import MysqlOpenAreaFlow from backend.flow.engine.bamboo.scene.mysql.mysql_partition import MysqlPartitionFlow from backend.flow.engine.bamboo.scene.mysql.mysql_partition_cron import MysqlPartitionCronFlow @@ -683,8 +683,15 @@ def non_standby_slaves_upgrade_scene(self): """ 非Standby从库升级 """ - flow = MySQLNonStandbySlavesUpgradeFlow(root_id=self.root_id, ticket_data=self.ticket_data) - flow.upgrade() + flow = TendbClusterUpgradeFlow(root_id=self.root_id, ticket_data=self.ticket_data) + flow.upgrade_ro_slaves() + + def tendbha_upgrade_scene(self): + """ + tendbha 迁移升级,兼容一主多从的场景 + """ + flow = TendbClusterUpgradeFlow(root_id=self.root_id, ticket_data=self.ticket_data) + flow.upgrade_tendbha_cluster() def non_standby_slaves_destory_scene(self): """ diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index 6a76f53220..94186c2f4e 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -143,6 +143,7 @@ MigrateUpgradeMySQLSceneApiView, NonStanbySlavesDestorySceneApiView, NonStanbySlavesUpgradeMySQLSceneApiView, + TendbHaMigrateUpgradeSceneApiView, UpgradeMySQLSceneApiView, ) from backend.flow.views.name_service import ( @@ -345,6 +346,7 @@ url(r"^scene/upgrade_mysql_proxy$", UpgradeMySQLProxySceneApiView.as_view()), url(r"^scene/upgrade_mysql$", UpgradeMySQLSceneApiView.as_view()), url(r"^scene/migrate_upgrade_mysql$", MigrateUpgradeMySQLSceneApiView.as_view()), + url(r"^scene/migrate_upgrade_tendbha_cluster$", TendbHaMigrateUpgradeSceneApiView.as_view()), url(r"^scene/non_stanby_slave_upgrade_mysql$", NonStanbySlavesUpgradeMySQLSceneApiView.as_view()), url(r"^scene/uninstall_non_standby_slave$", NonStanbySlavesDestorySceneApiView.as_view()), # mysql diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_db_meta.py b/dbm-ui/backend/flow/utils/mysql/mysql_db_meta.py index e8ae300f37..1c5046c079 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_db_meta.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_db_meta.py @@ -519,6 +519,30 @@ def mysql_migrate_cluster_switch_storage(self): bk_cloud_id=self.cluster["bk_cloud_id"], port_list=[self.cluster["mysql_port"]], ) + if self.cluster.get("ro_slaves"): + ro_slaves = self.cluster["ro_slaves"] + for ro_slave in ro_slaves: + old_ro_slave = ro_slave["old_ro_slave"] + old_ro_slave_ip = old_ro_slave["ip"] + new_ro_slave = ro_slave["new_ro_slave"] + new_ro_slave_ip = new_ro_slave["ip"] + api.cluster.tendbha.change_storage_cluster_entry( + cluster_id=self.cluster["cluster_id"], + slave_ip=old_ro_slave_ip, + new_slave_ip=new_ro_slave_ip, + ) + api.cluster.tendbha.storage_tuple.add_storage_tuple( + master_ip=self.cluster["new_master_ip"], + slave_ip=self.cluster["new_ro_slave_ip"], + bk_cloud_id=self.cluster["bk_cloud_id"], + port_list=[self.cluster["mysql_port"]], + ) + api.cluster.tendbha.storage_tuple.remove_storage_tuple( + master_ip=self.cluster["old_master_ip"], + slave_ip=self.cluster["new_ro_slave_ip"], + bk_cloud_id=self.cluster["bk_cloud_id"], + port_list=[self.cluster["mysql_port"]], + ) def mysql_migrate_cluster_add_tuple(self): """ @@ -789,7 +813,8 @@ def ro_slave_recover_del_instance(self): for cluster_id in self.cluster["cluster_ids"]: cluster = Cluster.objects.get(id=cluster_id) master = cluster.main_storage_instances()[0] - old_slave = cluster.storageinstance_set.get(machine__ip=self.cluster["uninstall_ip"], port=master.port) + ro_slave_ip = self.cluster["uninstall_ip"] + old_slave = cluster.storageinstance_set.get(machine__ip=ro_slave_ip, port=master.port) api.cluster.tendbha.remove_storage_tuple( master_ip=master.machine.ip, slave_ip=old_slave.machine.ip, @@ -811,11 +836,11 @@ def ro_slave_recover_del_instance(self): ] ) if not StorageInstance.objects.filter( - machine__ip=self.cluster["uninstall_ip"], machine__bk_cloud_id=cluster.bk_cloud_id + machine__ip=ro_slave_ip, machine__bk_cloud_id=cluster.bk_cloud_id ).exists(): - api.machine.delete(machines=[self.cluster["uninstall_ip"]], bk_cloud_id=cluster.bk_cloud_id) + api.machine.delete(machines=[ro_slave_ip], bk_cloud_id=cluster.bk_cloud_id) # 删除cluster entry - for ce in ClusterEntry.objects.filter(cluster=cluster).all(): + for ce in ClusterEntry.objects.filter(storageinstance__machine__ip=ro_slave_ip).all(): ce.delete(keep_parents=True) def update_upgrade_slaves_dbmodule(self): @@ -845,7 +870,7 @@ def dissolve_master_slave_relationship(self): ) api.cluster.tendbha.remove_slave(cluster_id=cluster.id, target_slave_ip=old_slave.machine.ip) - def del_old_slave_meta(self): + def del_cluster_old_machine_meta(self): """ 删除旧从节点的元数据 """ @@ -853,17 +878,17 @@ def del_old_slave_meta(self): for cluster_id in self.cluster["cluster_ids"]: cluster = Cluster.objects.get(id=cluster_id) master = cluster.main_storage_instances()[0] - old_slave = StorageInstance.objects.get(machine__ip=self.cluster["uninstall_ip"], port=master.port) + instance = StorageInstance.objects.get(machine__ip=self.cluster["uninstall_ip"], port=master.port) # 删除服务实例 CcManage(cluster.bk_biz_id, cluster_type=cluster.cluster_type).delete_service_instance( - bk_instance_ids=[old_slave.bk_instance_id] + bk_instance_ids=[instance.bk_instance_id] ) # 删除实例元数据信息 api.storage_instance.delete( [ { - "ip": old_slave.machine.ip, - "port": old_slave.port, + "ip": instance.machine.ip, + "port": instance.port, "bk_cloud_id": cluster.bk_cloud_id, } ] diff --git a/dbm-ui/backend/flow/views/mysql_upgrade.py b/dbm-ui/backend/flow/views/mysql_upgrade.py index d66f097365..c9d428ca89 100644 --- a/dbm-ui/backend/flow/views/mysql_upgrade.py +++ b/dbm-ui/backend/flow/views/mysql_upgrade.py @@ -66,6 +66,21 @@ def post(request): return Response({"root_id": root_id}) +class TendbHaMigrateUpgradeSceneApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/migrate_upgrade_tendbha_cluster + """ + + @staticmethod + def post(request): + logger.info(_("开始测试迁移升级tendbha场景")) + root_id = generate_root_id() + logger.info("define root_id: {}".format(root_id)) + test = MySQLController(root_id=root_id, ticket_data=request.data) + test.tendbha_upgrade_scene() + return Response({"root_id": root_id}) + + class NonStanbySlavesDestorySceneApiView(FlowTestView): """ api: /apis/v1/flow/scene/uninstall_non_standby_slave