diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py index c93d5f1597..9a9a21964a 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py @@ -179,18 +179,14 @@ def non_standby_slaves_upgrade_subflow( ) sub_pipeline.add_sub_pipeline(sub_flow=install_sub_pipeline) + # 恢复主从数据 + local_backup = False if backup_source == MySQLBackupSource.LOCAL: - # 使用本地备份来做迁移 - sync_data_sub_pipeline_list = build_data_repl_from_local_backup( - root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset - ) - sub_pipeline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list) - elif backup_source == MySQLBackupSource.REMOTE: - # 使用远程备份来做迁移 - sync_data_sub_pipeline_list = build_sync_data_sub_pipelines( - root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip - ) - sub_pipeline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list) + local_backup = True + sync_data_sub_pipeline_list = build_sync_data_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, local_backup + ) + sub_pipeline.add_parallel_sub_pipeline(sync_data_sub_pipeline_list) # 切换到新从节点 if not add_slave_only: @@ -269,10 +265,12 @@ def build_install_sub_pipeline( kwargs=asdict(exec_act_kwargs), ) - return install_sub_pipeline.build_sub_process() + return install_sub_pipeline.build_sub_process(sub_name=_("{}:安装MySQL实例".format(new_slave_ip))) -def build_sync_data_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip): +def build_sync_data_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, local_backup: bool +): sync_data_sub_pipeline_list = [] for cluster_id in relation_cluster_ids: cluster_model = Cluster.objects.get(id=cluster_id) @@ -291,83 +289,49 @@ def build_sync_data_sub_pipelines(root_id, parent_global_data, relation_cluster_ "change_master_force": True, } sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) - sync_data_sub_pipeline.add_sub_pipeline( - sub_flow=slave_recover_sub_flow( - root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster_info - ) - ) - sync_data_sub_pipeline.add_act( - act_name=_("同步完毕,写入主从关系,设置节点为running状态"), - act_component_code=MySQLDBMetaComponent.code, - kwargs=asdict( - DBMetaOPKwargs( - db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, - cluster=cluster_info, - is_update_trans_data=True, + if local_backup: + # 获取本地备份并恢复 + inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)] + stand_by_slaves = cluster_model.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, + is_stand_by=True, + status=InstanceStatus.RUNNING.value, + ).exclude(machine__ip__in=[new_slave_ip]) + if len(stand_by_slaves) > 0: + inst_list.append( + "{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port) + ) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_data_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, ) - ), - ) - sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) - return sync_data_sub_pipeline_list - - -def build_data_repl_from_local_backup( - root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset -): - sync_data_sub_pipeline_list = [] - for cluster_id in relation_cluster_ids: - cluster_model = Cluster.objects.get(id=cluster_id) - master = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) - cluster = { - "mysql_port": master.port, - "cluster_id": cluster_model.id, - "cluster_type": cluster.cluster_type, - "master_ip": master.machine.ip, - "master_port": master.port, - "new_slave_ip": new_slave_ip, - "new_slave_port": master.port, - "bk_cloud_id": cluster_model.bk_cloud_id, - "file_target_path": f"/data/dbbak/{root_id}/{master.port}", - "charset": charset, - "change_master_force": True, - "change_master": True, - } - sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) - # 获取本地备份并恢复 - inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)] - non_stand_by_slaves = cluster_model.storageinstance_set.filter( - instance_inner_role=InstanceInnerRole.SLAVE.value, - is_stand_by=False, - status=InstanceStatus.RUNNING.value, - ).exclude(machine__ip__in=[new_slave_ip]) - if len(non_stand_by_slaves) > 0: - inst_list.append( - "{}{}{}".format(non_stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, non_stand_by_slaves[0].port) ) - sync_data_sub_pipeline.add_sub_pipeline( - sub_flow=mysql_restore_data_sub_flow( - root_id=root_id, - ticket_data=copy.deepcopy(parent_global_data), - cluster=cluster, - cluster_model=cluster_model, - ins_list=inst_list, + else: + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=slave_recover_sub_flow( + root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster + ) ) - ) - # 恢复完毕的时候 slave 状态改为running + sync_data_sub_pipeline.add_act( act_name=_("同步完毕,写入主从关系,设置节点为running状态"), act_component_code=MySQLDBMetaComponent.code, kwargs=asdict( DBMetaOPKwargs( db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, - cluster=cluster, + cluster=cluster_info, is_update_trans_data=True, ) ), ) - - sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) - return sync_data_sub_pipeline_list + sync_data_sub_pipeline_list.append( + sync_data_sub_pipeline.build_sub_process(sub_name=_("{}:恢复实例数据".format(cluster_model.immute_domain))) + ) + return sync_data_sub_pipeline_list def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, old_slave_ip, new_slave_ip): @@ -402,7 +366,9 @@ def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids ) ), ) - switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换到新从节点"))) + switch_sub_pipeline_list.append( + switch_sub_pipeline.build_sub_process(sub_name=_("切换到新从节点:{}".format(new_slave_ip))) + ) return switch_sub_pipeline_list