diff --git a/dbm-ui/backend/db_meta/api/cluster/tendbcluster/remotedb_node_migrate.py b/dbm-ui/backend/db_meta/api/cluster/tendbcluster/remotedb_node_migrate.py index 3cb06f47d9..d1282ecc7a 100644 --- a/dbm-ui/backend/db_meta/api/cluster/tendbcluster/remotedb_node_migrate.py +++ b/dbm-ui/backend/db_meta/api/cluster/tendbcluster/remotedb_node_migrate.py @@ -29,62 +29,70 @@ class TenDBClusterMigrateRemoteDb: def storage_create( cls, cluster_id: int, - master_ip: str, - slave_ip: str, ports: list, creator: str, mysql_version: str, resource_spec: dict, + slave_ip: str = None, + master_ip: str = None, ): - """主从成对迁移初始化机器写入元数据""" + """主从成对迁移初始化机器写入元数据,兼容单实例安装""" cluster = Cluster.objects.get(id=cluster_id) bk_cloud_id = cluster.bk_cloud_id bk_biz_id = cluster.bk_biz_id time_zone = cluster.time_zone mysql_pkg = Package.get_latest_package(version=mysql_version, pkg_type=MediumEnum.MySQL, db_type=DBType.MySQL) - machines = [ - { - "ip": master_ip, - "bk_biz_id": int(bk_biz_id), - "machine_type": MachineType.REMOTE.value, - "spec_id": resource_spec[MachineType.REMOTE.value]["id"], - "spec_config": resource_spec[MachineType.REMOTE.value], - }, - { - "ip": slave_ip, - "bk_biz_id": int(bk_biz_id), - "machine_type": MachineType.REMOTE.value, - "spec_id": resource_spec[MachineType.REMOTE.value]["id"], - "spec_config": resource_spec[MachineType.REMOTE.value], - }, - ] - api.machine.create(machines=machines, creator=creator, bk_cloud_id=bk_cloud_id) + machines = [] storages = [] - for port in ports: - storages.append( + if master_ip is not None: + machines.append( { "ip": master_ip, - "port": port, - "instance_role": InstanceRole.REMOTE_MASTER.value, - "is_stand_by": True, # 标记实例属于切换组实例 - "db_version": get_mysql_real_version(mysql_pkg.name), # 存储真正的版本号信息 - }, + "bk_biz_id": int(bk_biz_id), + "machine_type": MachineType.REMOTE.value, + "spec_id": resource_spec[MachineType.REMOTE.value]["id"], + "spec_config": resource_spec[MachineType.REMOTE.value], + } ) - storages.append( + for port in ports: + storages.append( + { + "ip": master_ip, + "port": port, + "instance_role": InstanceRole.REMOTE_MASTER.value, + "is_stand_by": True, # 标记实例属于切换组实例 + "db_version": get_mysql_real_version(mysql_pkg.name), # 存储真正的版本号信息 + }, + ) + if slave_ip is not None: + machines.append( { "ip": slave_ip, - "port": port, - "instance_role": InstanceRole.REMOTE_SLAVE.value, - "is_stand_by": True, # 标记实例属于切换组实例 - "db_version": get_mysql_real_version(mysql_pkg.name), # 存储真正的版本号信息 - }, + "bk_biz_id": int(bk_biz_id), + "machine_type": MachineType.REMOTE.value, + "spec_id": resource_spec[MachineType.REMOTE.value]["id"], + "spec_config": resource_spec[MachineType.REMOTE.value], + } ) + for port in ports: + storages.append( + { + "ip": slave_ip, + "port": port, + "instance_role": InstanceRole.REMOTE_SLAVE.value, + "is_stand_by": True, # 标记实例属于切换组实例 + "db_version": get_mysql_real_version(mysql_pkg.name), # 存储真正的版本号信息 + }, + ) + + api.machine.create(machines=machines, creator=creator, bk_cloud_id=bk_cloud_id) api.storage_instance.create( instances=storages, creator=creator, time_zone=time_zone, status=InstanceStatus.RESTORING ) # cluster映射关系 storages = request_validator.validated_storage_list(storages, allow_empty=False, allow_null=False) storage_objs = common.filter_out_instance_obj(storages, StorageInstance.objects.all()) + cluster.storageinstance_set.add(*storage_objs) # 转移模块 cc_topo_operator = MysqlCCTopoOperator(cluster) @@ -100,7 +108,6 @@ def switch_remote_node(cls, cluster_id: int, source: dict, target: dict): """ cluster = Cluster.objects.get(id=cluster_id) bk_cloud_id = cluster.bk_cloud_id - # bk_biz_id = cluster.bk_biz_id source_master_obj = StorageInstance.objects.get( machine__ip=source["master"]["ip"], port=source["master"]["port"], machine__bk_cloud_id=bk_cloud_id ) @@ -127,10 +134,6 @@ def switch_remote_node(cls, cluster_id: int, source: dict, target: dict): ) storage_shard.storage_instance_tuple = target_tuple storage_shard.save() - # storage_shard.delete() - # TenDBClusterStorageSet.objects.create( - # storage_instance_tuple=target_tuple, shard_id=storage_shard.shard_key, cluster=cluster - # ) @classmethod @transaction.atomic diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py new file mode 100644 index 0000000000..38bcf5ad8d --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_local_slave_recover.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import copy +import logging +from dataclasses import asdict +from datetime import datetime +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import ClusterType, InstanceStatus +from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import remote_slave_recover_sub_flow +from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent +from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent +from backend.flow.utils.mysql.common.mysql_cluster_info import get_version_and_charset +from backend.flow.utils.mysql.mysql_act_dataclass import DBMetaOPKwargs, DownloadMediaKwargs, ExecActuatorKwargs +from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext +from backend.flow.utils.spider.spider_db_meta import SpiderDBMeta +from backend.flow.utils.spider.tendb_cluster_info import get_slave_local_recover_info + +logger = logging.getLogger("flow") + + +class TenDBRemoteSlaveLocalRecoverFlow(object): + """ + TenDB 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复 + """ + + def __init__(self, root_id: str, ticket_data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param ticket_data : 单据传递参数 + """ + self.root_id = root_id + self.ticket_data = ticket_data + self.data = {} + + def tendb_remote_slave_local_recover(self): + """ + tendb cluster remote slave recover + """ + tendb_migrate_pipeline_all = Builder(root_id=self.root_id, data=copy.deepcopy(self.ticket_data)) + tendb_migrate_pipeline_all_list = [] + # 阶段1 获取集群所有信息。计算端口,构建数据。 + for info in self.ticket_data["infos"]: + self.data = copy.deepcopy(info) + self.data["bk_cloud_id"] = self.ticket_data["bk_cloud_id"] + self.data["root_id"] = self.root_id + self.data["uid"] = self.ticket_data["uid"] + self.data["ticket_type"] = self.ticket_data["ticket_type"] + self.data["bk_biz_id"] = self.ticket_data["bk_biz_id"] + self.data["created_by"] = self.ticket_data["created_by"] + # self.data["module"] = info["module"] + # 卸载流程时强制卸载 + self.data["force"] = True + # 先判断备份是否存在 + backup_handler = FixPointRollbackHandler(self.data["cluster_id"]) + restore_time = datetime.now() + # restore_time = datetime.strptime("2023-07-31 17:40:00", "%Y-%m-%d %H:%M:%S") + backup_info = backup_handler.query_latest_backup_log(restore_time) + if backup_info is None: + logger.error("cluster {} backup info not exists".format(self.data["cluster_id"])) + raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(self.data["cluster_id"]))) + logger.debug(backup_info) + + cluster_info = get_slave_local_recover_info(self.data["cluster_id"], self.data["storage_id"]) + charset, db_version = get_version_and_charset( + bk_biz_id=cluster_info["bk_biz_id"], + db_module_id=cluster_info["db_module_id"], + cluster_type=cluster_info["cluster_type"], + ) + cluster_info["charset"] = charset + cluster_info["db_version"] = db_version + self.data["target_ip"] = cluster_info["target_ip"] + tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + + cluster_info["ports"] = [] + for shard_id, shard in cluster_info["my_shards"].items(): + slave = { + "ip": self.data["target_ip"], + "port": shard["port"], + "bk_cloud_id": self.data["bk_cloud_id"], + "instance": "{}{}{}".format(self.data["target_ip"], IP_PORT_DIVIDER, shard["port"]), + } + cluster_info["my_shards"][shard_id]["new_slave"] = slave + cluster_info["ports"].append(shard["port"]) + + sync_data_sub_pipeline_list = [] + for shard_id, node in cluster_info["my_shards"].items(): + ins_cluster = copy.deepcopy(cluster_info["cluster"]) + ins_cluster["charset"] = cluster_info["charset"] + ins_cluster["new_slave_ip"] = node["new_slave"]["ip"] + ins_cluster["new_slave_port"] = node["new_slave"]["port"] + ins_cluster["master_ip"] = node["master"]["ip"] + ins_cluster["slave_ip"] = node["slave"]["ip"] + ins_cluster["master_port"] = node["master"]["port"] + ins_cluster["slave_port"] = node["slave"]["port"] + # 设置实例状态 + ins_cluster["storage_id"] = node["slave"]["id"] + ins_cluster["storage_status"] = InstanceStatus.RESTORING.value + # todo 正式环境放开file_target_path,需要备份接口支持自动创建目录 + # ins_cluster["file_target_path"] = "/data/dbbak/{}/{}"\ + # .format(self.root_id, ins_cluster["new_master_port"]) + ins_cluster["file_target_path"] = "/home/mysql/install" + ins_cluster["shard_id"] = shard_id + ins_cluster["change_master_force"] = False + + ins_cluster["backupinfo"] = backup_info["remote_node"].get(shard_id, {}) + # 判断 remote_node 下每个分片的备份信息是否正常 + if ( + len(ins_cluster["backupinfo"]) == 0 + or len(ins_cluster["backupinfo"].get("file_list_details", {})) == 0 + ): + logger.error( + "cluster {} shard {} backup info not exists".format(self.data["cluster_id"], shard_id) + ) + raise TendbGetBackupInfoFailedException( + message=_("获取集群分片 {} shard {} 的备份信息失败".format(self.data["cluster_id"], shard_id)) + ) + sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + sync_data_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.tendb_modify_storage_status.__name__, + cluster=copy.deepcopy(ins_cluster), + is_update_trans_data=False, + ) + ), + ) + exec_act_kwargs = ExecActuatorKwargs( + bk_cloud_id=int(ins_cluster["bk_cloud_id"]), + cluster_type=ClusterType.TenDBCluster, + ) + exec_act_kwargs.exec_ip = ins_cluster["new_slave_ip"] + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_clean_mysql_payload.__name__ + sync_data_sub_pipeline.add_act( + act_name=_("slave重建之清理从库{}").format(exec_act_kwargs.exec_ip), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=remote_slave_recover_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=ins_cluster + ) + ) + ins_cluster["storage_status"] = InstanceStatus.RUNNING.value + sync_data_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.tendb_modify_storage_status.__name__, + cluster=copy.deepcopy(ins_cluster), + is_update_trans_data=False, + ) + ), + ) + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + + tendb_migrate_pipeline.add_act( + act_name=_("下发工具"), + act_component_code=TransFileComponent.code, + kwargs=asdict( + DownloadMediaKwargs( + bk_cloud_id=cluster_info["bk_cloud_id"], + exec_ip=self.data["target_ip"], + file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), + ) + ), + ) + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) + tendb_migrate_pipeline_all_list.append( + tendb_migrate_pipeline.build_sub_process(_("集群迁移{}").format(self.data["cluster_id"])) + ) + + # 运行流程 + tendb_migrate_pipeline_all.add_parallel_sub_pipeline(tendb_migrate_pipeline_all_list) + tendb_migrate_pipeline_all.run_pipeline(init_trans_data_class=ClusterInfoContext()) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py new file mode 100644 index 0000000000..8b1a3b4aca --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/remote_slave_recover.py @@ -0,0 +1,317 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import copy +import logging +from dataclasses import asdict +from datetime import datetime +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import ClusterType +from backend.db_meta.models import Cluster +from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import ( + build_surrounding_apps_sub_flow, + install_mysql_in_cluster_sub_flow, +) +from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException +from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import ( + remote_node_uninstall_sub_flow, + remote_slave_recover_sub_flow, +) +from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent +from backend.flow.plugins.components.collections.common.pause import PauseComponent +from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent +from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent +from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs +from backend.flow.utils.mysql.common.mysql_cluster_info import get_version_and_charset +from backend.flow.utils.mysql.mysql_act_dataclass import ClearMachineKwargs, DBMetaOPKwargs, ExecActuatorKwargs +from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext +from backend.flow.utils.spider.spider_db_meta import SpiderDBMeta +from backend.flow.utils.spider.tendb_cluster_info import get_slave_recover_info + +logger = logging.getLogger("flow") + + +class TenDBRemoteSlaveRecoverFlow(object): + """ + TenDB 后端从节点恢复: 迁移机器恢复,指定实例的本地恢复 + """ + + def __init__(self, root_id: str, ticket_data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param ticket_data : 单据传递参数 + """ + self.root_id = root_id + self.ticket_data = ticket_data + self.data = {} + + def tendb_remote_slave_recover(self): + """ + tendb cluster remote slave recover + """ + tendb_migrate_pipeline_all = Builder(root_id=self.root_id, data=copy.deepcopy(self.ticket_data)) + tendb_migrate_pipeline_all_list = [] + # 阶段1 获取集群所有信息。计算端口,构建数据。 + for info in self.ticket_data["infos"]: + self.data = copy.deepcopy(info) + self.data["bk_cloud_id"] = self.ticket_data["bk_cloud_id"] + self.data["root_id"] = self.root_id + self.data["start_port"] = 20000 + self.data["uid"] = self.ticket_data["uid"] + self.data["ticket_type"] = self.ticket_data["ticket_type"] + self.data["bk_biz_id"] = self.ticket_data["bk_biz_id"] + self.data["created_by"] = self.ticket_data["created_by"] + # self.data["module"] = info["module"] + self.data["source_ip"] = self.data["source_slave"]["ip"] + self.data["target_ip"] = self.data["target_slave"]["ip"] + # 卸载流程时强制卸载 + self.data["force"] = True + # 先判断备份是否存在 + backup_handler = FixPointRollbackHandler(self.data["cluster_id"]) + restore_time = datetime.now() + # restore_time = datetime.strptime("2023-07-31 17:40:00", "%Y-%m-%d %H:%M:%S") + backup_info = backup_handler.query_latest_backup_log(restore_time) + if backup_info is None: + logger.error("cluster {} backup info not exists".format(self.data["cluster_id"])) + raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(self.data["cluster_id"]))) + logger.debug(backup_info) + tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + + cluster_info = get_slave_recover_info(self.data["cluster_id"], self.data["target_ip"]) + charset, db_version = get_version_and_charset( + bk_biz_id=cluster_info["bk_biz_id"], + db_module_id=cluster_info["db_module_id"], + cluster_type=cluster_info["cluster_type"], + ) + cluster_info["charset"] = charset + cluster_info["db_version"] = db_version + cluster_class = Cluster.objects.get(id=self.data["cluster_id"]) + + # 构造从节点恢复 + cluster_info["ports"] = [] + for shard_id, shard in cluster_info["my_shards"].items(): + slave = { + "ip": self.data["target_ip"], + "port": shard["port"], + "bk_cloud_id": self.data["bk_cloud_id"], + "instance": "{}{}{}".format(self.data["target_ip"], IP_PORT_DIVIDER, shard["port"]), + } + cluster_info["my_shards"][shard_id]["new_slave"] = slave + cluster_info["ports"].append(shard["port"]) + + install_sub_pipeline_list = [] + install_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + install_sub_pipeline.add_sub_pipeline( + sub_flow=install_mysql_in_cluster_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=cluster_class, + new_mysql_list=[self.data["target_ip"]], + install_ports=cluster_info["ports"], + ) + ) + cluster = { + "new_slave_ip": self.data["target_ip"], + "cluster_id": cluster_info["cluster_id"], + "bk_cloud_id": cluster_info["bk_cloud_id"], + "bk_biz_id": cluster_info["bk_biz_id"], + "ports": cluster_info["ports"], + "version": cluster_info["cluster"]["major_version"], + } + install_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.tendb_slave_recover_add_nodes.__name__, + cluster=copy.deepcopy(cluster), + is_update_trans_data=False, + ) + ), + ) + + install_sub_pipeline.add_act( + act_name=_("安装backup-client工具"), + act_component_code=DownloadBackupClientComponent.code, + kwargs=asdict( + DownloadBackupClientKwargs( + bk_cloud_id=cluster_class.bk_cloud_id, + download_host_list=[cluster["new_master_ip"], cluster["new_slave_ip"]], + ) + ), + ) + + exec_act_kwargs = ExecActuatorKwargs( + cluster=cluster, + bk_cloud_id=cluster_class.bk_cloud_id, + cluster_type=cluster_class.cluster_type, + get_mysql_payload_func=MysqlActPayload.get_install_tmp_db_backup_payload.__name__, + ) + exec_act_kwargs.exec_ip = [cluster["new_master_ip"], cluster["new_slave_ip"]] + install_sub_pipeline.add_act( + act_name=_("安装临时备份程序"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + install_sub_pipeline_list.append(install_sub_pipeline.build_sub_process(sub_name=_("安装remote从节点"))) + sync_data_sub_pipeline_list = [] + for shard_id, node in cluster_info["my_shards"].items(): + ins_cluster = copy.deepcopy(cluster_info["cluster"]) + ins_cluster["charset"] = cluster_info["charset"] + ins_cluster["new_slave_ip"] = node["new_slave"]["ip"] + ins_cluster["new_slave_port"] = node["new_slave"]["port"] + ins_cluster["master_ip"] = node["master"]["ip"] + ins_cluster["slave_ip"] = node["slave"]["ip"] + ins_cluster["master_port"] = node["master"]["port"] + ins_cluster["slave_port"] = node["slave"]["port"] + # todo 正式环境放开file_target_path,需要备份接口支持自动创建目录 + # ins_cluster["file_target_path"] = "/data/dbbak/{}/{}"\ + # .format(self.root_id, ins_cluster["new_master_port"]) + ins_cluster["file_target_path"] = "/home/mysql/install" + ins_cluster["shard_id"] = shard_id + ins_cluster["change_master_force"] = False + + ins_cluster["backupinfo"] = backup_info["remote_node"].get(shard_id, {}) + # 判断 remote_node 下每个分片的备份信息是否正常 + if ( + len(ins_cluster["backupinfo"]) == 0 + or len(ins_cluster["backupinfo"].get("file_list_details", {})) == 0 + ): + logger.error( + "cluster {} shard {} backup info not exists".format(self.data["cluster_id"], shard_id) + ) + raise TendbGetBackupInfoFailedException( + message=_("获取集群分片 {} shard {} 的备份信息失败".format(self.data["cluster_id"], shard_id)) + ) + sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=remote_slave_recover_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=ins_cluster + ) + ) + sync_data_sub_pipeline.add_act( + act_name=_("同步数据完毕,写入数据节点tuple相关元数据"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.tendb_slave_recover_add_tuple.__name__, + cluster=ins_cluster, + is_update_trans_data=True, + ) + ), + ) + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + # 阶段4 切换 todo 等待从库切换接口 + switch_sub_pipeline_list = [] + # 切换后写入元数据 + switch_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + switch_sub_pipeline.add_act( + act_name=_("SLAVE切换完毕后修改元数据指向"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.tendb_slave_recover_switch.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换SLAVE节点"))) + + # 阶段5 安装实例周边组件 + surrounding_sub_pipeline_list = [] + surrounding_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + surrounding_sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster_class.bk_cloud_id, + master_ip_list=None, + slave_ip_list=[self.data["target_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + cluster_type=ClusterType.TenDBCluster.value, + ) + ) + surrounding_sub_pipeline_list.append(surrounding_sub_pipeline.build_sub_process(sub_name=_("新机器安装周边组件"))) + + install_sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster["bk_cloud_id"], + master_ip_list=None, + slave_ip_list=[self.data["target_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + cluster_type=ClusterType.TenDBCluster.value, + ) + ) + + # 阶段6 卸载 + uninstall_svr_sub_pipeline_list = [] + uninstall_svr_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + ins_cluster = {"uninstall_ip": self.data["target_ip"], "cluster_id": cluster_info["cluster_id"]} + uninstall_svr_sub_pipeline.add_sub_pipeline( + sub_flow=remote_node_uninstall_sub_flow( + root_id=self.root_id, ticket_data=copy.deepcopy(self.data), ip=self.data["target_ip"] + ) + ) + uninstall_svr_sub_pipeline.add_act( + act_name=_("整机卸载成功后删除元数据"), + act_component_code=SpiderDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=SpiderDBMeta.remotedb_migrate_remove_storage.__name__, + cluster=ins_cluster, + is_update_trans_data=True, + ) + ), + ) + uninstall_svr_sub_pipeline.add_act( + act_name=_("清理机器配置"), + act_component_code=MySQLClearMachineComponent.code, + kwargs=asdict( + ClearMachineKwargs( + exec_ip=self.data["target_ip"], + bk_cloud_id=self.data["bk_cloud_id"], + ) + ), + ) + uninstall_svr_sub_pipeline_list.append( + uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(self.data["target_ip"]))) + ) + # 安装实例 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=install_sub_pipeline_list) + # 数据同步 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=sync_data_sub_pipeline_list) + # 人工确认切换迁移实例 + tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) + # 切换迁移实例 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=switch_sub_pipeline_list) + # 安装周边组件 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=surrounding_sub_pipeline_list) + # 卸载流程人工确认 + tendb_migrate_pipeline.add_act(act_name=_("人工确认卸载实例"), act_component_code=PauseComponent.code, kwargs={}) + # 卸载remote节点 + tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=uninstall_svr_sub_pipeline_list) + tendb_migrate_pipeline_all_list.append( + tendb_migrate_pipeline.build_sub_process(_("集群迁移{}").format(self.data["cluster_id"])) + ) + # 运行流程 + tendb_migrate_pipeline_all.add_parallel_sub_pipeline(tendb_migrate_pipeline_all_list) + tendb_migrate_pipeline_all.run_pipeline(init_trans_data_class=ClusterInfoContext()) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_remote_node_migrate.py b/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_remote_node_migrate.py index 396b17978d..8a487eae86 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_remote_node_migrate.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/spider/spider_remote_node_migrate.py @@ -343,3 +343,90 @@ def remote_node_uninstall_sub_flow(root_id: str, ticket_data: dict, ip: str): ) sub_pipeline.add_parallel_acts(sub_pipeline_list) return sub_pipeline.build_sub_process(sub_name=_("Remote node {} 卸载整机实例".format(cluster["uninstall_ip"]))) + + +def remote_slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict): + """ + tendb remote slave 节点 恢复。(只做流程,元数据请在主流程控制) + @param root_id: flow流程的root_id + @param ticket_data: 关联单据 ticket对象 + @param cluster_info: 关联的cluster对象 + """ + + sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) + # 下发dbactor》通过master/slave 获取备份的文件》判断备份文件》恢复数据》change master + cluster = { + "cluster_id": cluster_info["cluster_id"], + "master_ip": cluster_info["master_ip"], + "slave_ip": cluster_info["slave_ip"], + "master_port": cluster_info["master_port"], + "new_slave_ip": cluster_info["new_slave_ip"], + "new_slave_port": cluster_info["new_slave_port"], + "bk_cloud_id": cluster_info["bk_cloud_id"], + "file_target_path": cluster_info["file_target_path"], + "change_master_force": cluster_info["change_master_force"], + "backupinfo": cluster_info["backupinfo"], + "charset": cluster_info["charset"], + } + exec_act_kwargs = ExecActuatorKwargs( + bk_cloud_id=int(cluster["bk_cloud_id"]), + cluster_type=ClusterType.TenDBCluster, + ) + backup_info = cluster["backupinfo"] + # 新从库下载备份介质 下载为异步下载,定时调起接口扫描下载结果 + task_ids = [i["task_id"] for i in backup_info["file_list_details"]] + download_kwargs = DownloadBackupFileKwargs( + bk_cloud_id=cluster["bk_cloud_id"], + task_ids=task_ids, + dest_ip=cluster["new_slave_ip"], + desc_dir=cluster["file_target_path"], + reason="spider remote node sync data", + ) + sub_pipeline.add_act( + act_name=_("下载全库备份介质到 {}".format(cluster["new_slave_ip"])), + act_component_code=MySQLDownloadBackupfileComponent.code, + kwargs=asdict(download_kwargs), + ) + + # 阶段4 恢复数据remote主从节点的数据 + cluster["restore_ip"] = cluster["new_slave_ip"] + cluster["restore_port"] = cluster["new_slave_port"] + cluster["source_ip"] = cluster["master_ip"] + cluster["source_port"] = cluster["master_port"] + cluster["change_master"] = False + exec_act_kwargs.cluster = copy.deepcopy(cluster) + exec_act_kwargs.exec_ip = cluster["new_slave_ip"] + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_restore_remotedb_payload.__name__ + sub_pipeline.add_act( + act_name=_("恢复新从节点数据 {}:{}".format(exec_act_kwargs.exec_ip, cluster["restore_port"])), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + # 阶段5 change master: 新从库指向旧主库 + cluster["target_ip"] = cluster["master_ip"] + cluster["target_port"] = cluster["master_port"] + cluster["repl_ip"] = cluster["new_slave_ip"] + exec_act_kwargs.cluster = copy.deepcopy(cluster) + exec_act_kwargs.exec_ip = cluster["master_ip"] + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_grant_remotedb_repl_user.__name__ + sub_pipeline.add_act( + act_name=_("新增repl帐户{}".format(exec_act_kwargs.exec_ip)), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + cluster["repl_ip"] = cluster["new_slave_ip"] + cluster["repl_port"] = cluster["new_slave_port"] + cluster["target_ip"] = cluster["master_ip"] + cluster["target_port"] = cluster["master_port"] + cluster["change_master_type"] = MysqlChangeMasterType.BACKUPFILE.value + exec_act_kwargs.cluster = copy.deepcopy(cluster) + exec_act_kwargs.exec_ip = cluster["new_slave_ip"] + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_remotedb_change_master.__name__ + sub_pipeline.add_act( + act_name=_("建立主从关系:新主库指向旧主库 {}:{}".format(exec_act_kwargs.exec_ip, cluster["repl_port"])), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + return sub_pipeline.build_sub_process(sub_name=_("RemoteDB从节点重建子流程{}".format(exec_act_kwargs.exec_ip))) diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 3140f55df1..a6e4b3c39e 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -1899,9 +1899,6 @@ def get_install_tmp_db_backup_payload(self, **kwargs): """ db_backup_pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type=MediumEnum.DbBackup) cfg = self.__get_dbbackup_config() - # cluster = Cluster.objects.get(id=self.cluster["cluster_id"]) - # ins_list = - # StorageInstance.objects.filter(machine__ip=kwargs["ip"], machine__bk_cloud_id=self.cluster["bk_cloud_id"]) return { "db_type": DBActuatorTypeEnum.MySQL.value, "action": DBActuatorActionEnum.DeployDbbackup.value, diff --git a/dbm-ui/backend/flow/utils/spider/spider_db_meta.py b/dbm-ui/backend/flow/utils/spider/spider_db_meta.py index 5fd4f9d0c0..5cfdca1398 100644 --- a/dbm-ui/backend/flow/utils/spider/spider_db_meta.py +++ b/dbm-ui/backend/flow/utils/spider/spider_db_meta.py @@ -15,7 +15,7 @@ from backend.db_meta.api.cluster.tendbcluster.handler import TenDBClusterClusterHandler from backend.db_meta.api.cluster.tendbcluster.remotedb_node_migrate import TenDBClusterMigrateRemoteDb from backend.db_meta.enums import ClusterEntryRole, MachineType, TenDBClusterSpiderRole -from backend.db_meta.models import Cluster +from backend.db_meta.models import Cluster, StorageInstance from backend.flow.utils.dict_to_dataclass import dict_to_dataclass from backend.flow.utils.spider.spider_act_dataclass import ShardInfo @@ -236,3 +236,46 @@ def tendb_cluster_slave_destroy(self): } TenDBClusterClusterHandler.clear_clusterentry(**kwargs) return True + + def tendb_slave_recover_add_nodes(self): + """ + remotedb 成对迁移添加初始化节点元数据 + """ + TenDBClusterMigrateRemoteDb.storage_create( + cluster_id=self.cluster["cluster_id"], + slave_ip=self.cluster["new_slave_ip"], + ports=self.cluster["ports"], + creator=self.global_data["created_by"], + mysql_version=self.cluster["version"], + resource_spec=self.global_data["resource_spec"], + ) + return True + + def tendb_slave_recover_add_tuple(self): + new_slave_to_old_master = { + "master": {"ip": self.cluster["master_ip"], "port": self.cluster["master_port"]}, + "slave": {"ip": self.cluster["new_slave_ip"], "port": self.cluster["new_slave_port"]}, + } + TenDBClusterMigrateRemoteDb.add_storage_tuple( + cluster_id=self.cluster["cluster_id"], storage=new_slave_to_old_master + ) + # todo 是否修改new_master角色为中继状态 + + def tendb_modify_storage_status(self): + storage = StorageInstance.objects.get(self.cluster["storage_id"]) + storage.status = self.cluster["storage_status"] + storage.save() + + def tendb_slave_recover_switch(self): + for node in self.cluster["my_shards"].values(): + source = { + "master": {"ip": node["master"]["ip"], "port": node["master"]["port"]}, + "slave": {"ip": node["slave"]["ip"], "port": node["slave"]["port"]}, + } + target = { + "master": {"ip": node["master"]["ip"], "port": node["master"]["port"]}, + "slave": {"ip": node["new_slave"]["ip"], "port": node["new_slave"]["port"]}, + } + TenDBClusterMigrateRemoteDb.switch_remote_node( + cluster_id=self.cluster["cluster_id"], source=source, target=target + ) diff --git a/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py b/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py index 5e23c7476e..668237e6f8 100644 --- a/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py +++ b/dbm-ui/backend/flow/utils/spider/tendb_cluster_info.py @@ -106,6 +106,8 @@ def get_cluster_info(cluster_id: int): master_obj = StorageInstance.objects.get(id=shard.storage_instance_tuple.ejector_id) slave_obj = StorageInstance.objects.get(id=shard.storage_instance_tuple.receiver_id) shards_info = {"master": master_obj.simple_desc, "slave": slave_obj.simple_desc} + shards_info["master"]["id"] = master_obj.id + shards_info["slave"]["id"] = slave_obj.id cluster_info["shards"][shard.shard_id] = shards_info cluster_info["shard_ids"].append(shard.shard_id) cluster_info["masters"].append(master_obj.machine.ip) @@ -117,3 +119,25 @@ def get_cluster_info(cluster_id: int): cluster_info["masters"].sort() cluster_info["slaves"].sort() return cluster_info + + +def get_slave_recover_info(cluster_id: int, ip: str): + cluster_info = get_cluster_info(cluster_id) + cluster_info["my_shards"] = {} + if ip in cluster_info["slaves"]: + for key, val in cluster_info["shards"].items(): + if val["slave"]["ip"] == ip: + cluster_info["my_shards"][key] = val + return cluster_info + + +def get_slave_local_recover_info(cluster_id: int, storage_id: int): + cluster_info = get_cluster_info(cluster_id) + cluster_info["my_shards"] = {} + storage = StorageInstance.objects.get(id=storage_id) + cluster_info["target_ip"] = storage.machine.ip + for key, val in cluster_info["shards"].items(): + if val["slave"]["id"] == storage.id: + cluster_info["my_shards"][key] = val + break + return cluster_info