diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py index 0a10b6973d..8a556cab13 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py @@ -18,12 +18,12 @@ from django.utils.crypto import get_random_string from django.utils.translation import ugettext as _ -from backend.configuration.constants import DBType +from backend.configuration.constants import MYSQL_USUAL_JOB_TIME, DBType from backend.constants import IP_PORT_DIVIDER -from backend.db_meta.enums import ClusterType, InstanceInnerRole -from backend.db_meta.models import Cluster +from backend.db_meta.enums import ClusterType, InstanceInnerRole, InstanceRole +from backend.db_meta.models import Cluster, StorageInstanceTuple from backend.db_package.models import Package -from backend.flow.consts import InstanceStatus, MediumEnum, RollbackType +from backend.flow.consts import InstanceStatus, MediumEnum, MysqlChangeMasterType, RollbackType from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException @@ -37,9 +37,10 @@ from backend.flow.engine.bamboo.scene.mysql.mysql_single_apply_flow import MySQLSingleApplyFlow from backend.flow.engine.bamboo.scene.spider.common.exceptions import NormalSpiderFlowException from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.mysql.mysql_rds_execute import MySQLExecuteRdsComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent from backend.flow.utils.mysql.common.mysql_cluster_info import get_version_and_charset -from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs +from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs, ExecuteRdsKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext @@ -250,6 +251,7 @@ def rollback_to_cluster_flow(self): rollback_class = Cluster.objects.get(id=self.data["rollback_cluster_id"]) storages = rollback_class.storageinstance_set.all() rollback_pipeline_list = [] + repl_pipeline_list = [] for rollback_storage in storages: if not check_storage_database( rollback_class.bk_cloud_id, rollback_storage.machine.ip, rollback_storage.port @@ -362,7 +364,55 @@ def rollback_to_cluster_flow(self): sub_name=_("定点回档到{}:{}".format(rollback_storage.machine.ip, rollback_storage.port)) ) ) + + # 针对slave repeater角色的从库。建立复制链路。重置slave>添加复制账号和获取位点>建立主从关系 + if rollback_storage.instance_role in (InstanceRole.BACKEND_SLAVE, InstanceRole.BACKEND_REPEATER): + repl_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data)) + repl_pipeline.add_act( + act_name=_("从库reset slave {}").format(rollback_storage.ip_port), + act_component_code=MySQLExecuteRdsComponent.code, + kwargs=asdict( + ExecuteRdsKwargs( + bk_cloud_id=cluster_class.bk_cloud_id, + instance_ip=rollback_storage.machine.ip, + instance_port=rollback_storage.port, + sqls=["stop slave", "reset slave all"], + ) + ), + ) + repl_master = StorageInstanceTuple.objects.get(receiver=rollback_storage) + repl_cluster = { + "target_ip": repl_master.ejector.machine.ip, + "target_port": repl_master.ejector.port, + "repl_ip": rollback_storage.machine.ip, + "repl_port": rollback_storage.port, + "change_master_type": MysqlChangeMasterType.MASTERSTATUS.value, + } + exec_act_kwargs.cluster = copy.deepcopy(repl_cluster) + exec_act_kwargs.exec_ip = repl_master.ejector.machine.ip + exec_act_kwargs.job_timeout = MYSQL_USUAL_JOB_TIME + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_grant_remotedb_repl_user.__name__ + repl_pipeline.add_act( + act_name=_("新增repl帐户{}".format(exec_act_kwargs.exec_ip)), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + write_payload_var="show_master_status_info", + ) + exec_act_kwargs.exec_ip = rollback_storage.machine.ip + exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_remotedb_change_master.__name__ + repl_pipeline.add_act( + act_name=_("建立原主从关系{}".format(rollback_storage.ip_port)), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + repl_pipeline_list.append( + repl_pipeline.build_sub_process( + sub_name=_("建立主从关系: {}->{}".format(repl_master.ejector.ip_port, rollback_storage.ip_port)) + ) + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=rollback_pipeline_list) + sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=repl_pipeline_list) sub_pipeline_list.append( sub_pipeline.build_sub_process(sub_name=_("定点回档到{}".format(rollback_class.immute_domain))) )