Skip to content

Commit

Permalink
style: tendbHa数据恢复流程代码优化 #6973
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo committed Sep 19, 2024
1 parent 7fde93e commit 7622434
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from backend.components import DBConfigApi
from backend.components.dbconfig.constants import FormatType, LevelName
from backend.configuration.constants import DBType
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceInnerRole, InstanceStatus
from backend.db_meta.models import Cluster
from backend.db_package.models import Package
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
Expand All @@ -33,26 +34,32 @@
)
from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config
from backend.flow.engine.bamboo.scene.mysql.common.master_and_slave_switch import master_and_slave_switch
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import (
mysql_restore_master_slave_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow
from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import remote_instance_migrate_sub_flow
from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_crond_control import MysqlCrondMonitorControlComponent
from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs
from backend.flow.utils.mysql.common.mysql_cluster_info import get_ports, get_version_and_charset
from backend.flow.utils.mysql.mysql_act_dataclass import (
ClearMachineKwargs,
CrondMonitorKwargs,
DBMetaOPKwargs,
DownloadMediaKwargs,
ExecActuatorKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.ticket.builders.common.constants import MySQLBackupSource

logger = logging.getLogger("flow")

Expand All @@ -74,6 +81,9 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]):

# 定义备份文件存放到目标机器目录位置
self.backup_target_path = f"/data/dbbak/{self.root_id}"
self.local_backup = False
if self.ticket_data.get("backup_source") == MySQLBackupSource.LOCAL:
self.local_backup = True

def migrate_cluster_flow(self, use_for_upgrade=False):
"""
Expand Down Expand Up @@ -234,18 +244,42 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
cluster["slave_ip"] = self.data["slave_ip"]
cluster["master_port"] = master_model.port
cluster["slave_port"] = master_model.port
cluster["mysql_port"] = master_model.port
cluster["file_target_path"] = f"/data/dbbak/{self.root_id}/{master_model.port}"
cluster["cluster_id"] = cluster_model.id
cluster["bk_cloud_id"] = cluster_model.bk_cloud_id
cluster["change_master_force"] = False
cluster["change_master"] = False
cluster["charset"] = self.data["charset"]

sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=remote_instance_migrate_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
if self.local_backup:
stand_by_slaves = cluster_model.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[self.data["new_slave_ip"], self.data["new_master_ip"]])
# 从standby从库找备份
inst_list = ["{}{}{}".format(master_model.machine.ip, IP_PORT_DIVIDER, master_model.port)]
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_master_slave_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
)
else:
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=remote_instance_migrate_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)
)

sync_data_sub_pipeline.add_act(
act_name=_("数据恢复完毕,写入新主节点和旧主节点的关系链元数据"),
Expand Down Expand Up @@ -405,6 +439,18 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
db_backup_pkg_type=MysqlVersionToDBBackupForMap[self.data["db_version"]],
)
)
tendb_migrate_pipeline.add_act(
act_name=_("屏蔽监控 {} {}").format(self.data["new_master_ip"], self.data["new_slave_ip"]),
act_component_code=MysqlCrondMonitorControlComponent.code,
kwargs=asdict(
CrondMonitorKwargs(
bk_cloud_id=cluster_class.bk_cloud_id,
exec_ips=[self.data["new_master_ip"], self.data["new_slave_ip"]],
port=0,
minutes=240,
)
),
)
# 人工确认切换迁移实例
tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={})
# 切换迁移实例
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
install_mysql_in_cluster_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow
Expand Down Expand Up @@ -57,6 +58,7 @@
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.ticket.builders.common.constants import MySQLBackupSource

logger = logging.getLogger("flow")

Expand All @@ -76,6 +78,9 @@ def __init__(self, root_id: str, tick_data: Optional[Dict]):
self.data = {}
# 仅添加从库。不切换。不复制账号
self.add_slave_only = self.ticket_data.get("add_slave_only", False)
self.local_backup = False
if self.ticket_data.get("backup_source") == MySQLBackupSource.LOCAL:
self.local_backup = True

def tendb_ha_restore_slave_flow(self):
"""
Expand Down Expand Up @@ -200,13 +205,38 @@ def tendb_ha_restore_slave_flow(self):
"file_target_path": f"/data/dbbak/{self.root_id}/{master.port}",
"charset": self.data["charset"],
"change_master_force": True,
"change_master": True,
}
sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
if self.local_backup:
# 获取本地备份并恢复
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
stand_by_slaves = cluster_model.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[self.data["new_slave_ip"]])
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_data_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
)
)

else:
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)

sync_data_sub_pipeline.add_act(
act_name=_("同步完毕,写入主从关系,设置节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
Expand All @@ -218,6 +248,7 @@ def tendb_ha_restore_slave_flow(self):
)
),
)

sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))

switch_sub_pipeline_list = []
Expand Down Expand Up @@ -319,6 +350,19 @@ def tendb_ha_restore_slave_flow(self):
cluster_type=ClusterType.TenDBHA.value,
)
)
tendb_migrate_pipeline.add_act(
act_name=_("屏蔽监控 {}").format(self.data["new_slave_ip"]),
act_component_code=MysqlCrondMonitorControlComponent.code,
kwargs=asdict(
CrondMonitorKwargs(
bk_cloud_id=cluster_class.bk_cloud_id,
exec_ips=[self.data["new_slave_ip"]],
port=0,
minutes=240,
)
),
)

if not self.add_slave_only:
# 人工确认切换迁移实例
tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={})
Expand Down Expand Up @@ -473,12 +517,27 @@ def restore_local_slave_flow(self):
"charset": self.data["charset"],
"change_master_force": True,
"cluster_type": cluster_model.cluster_type,
"change_master": True,
}
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster

if self.local_backup:
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_data_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
)
)
else:
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)

# 卸载流程人工确认
tendb_migrate_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})
# 克隆权限
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,20 @@ def _execute(self, data, parent_data) -> bool:
self.log_info("{} exec {}".format(target_ip_info, node_name))

FlowNode.objects.filter(root_id=root_id, node_id=node_id).update(hosts=exec_ips)
cmd_str = "cd /home/mysql/mysql-crond && ./mysql-crond "
if kwargs["enable"]:
monitor_command = (
"cd /home/mysql/mysql-crond && ./mysql-crond enable-job --name-match mysql-monitor-{}-.*".format(
kwargs["port"]
)
)
cmd_str += " enable-job"
else:
monitor_command = (
"cd /home/mysql/mysql-crond && ./mysql-crond pause-job --name-match mysql-monitor-{}-.* -r {}m".format(
kwargs["port"], kwargs["minutes"]
)
)
cmd_str += " pause-job -r {}m".format(kwargs["minutes"])
if kwargs["port"] != 0:
cmd_str += " --name-match mysql-monitor-{}-.*".format(kwargs["port"])
if kwargs["name"] != "":
cmd_str += " --name {}".format(kwargs["name"])
self.log_info(cmd_str)
body = {
"bk_biz_id": env.JOB_BLUEKING_BIZ_ID,
"task_name": f"DBM_{node_name}_{node_id}",
"script_content": base64_encode(monitor_command),
"script_content": base64_encode(cmd_str),
"script_language": 1,
"target_server": {"ip_list": target_ip_info},
}
Expand Down
3 changes: 2 additions & 1 deletion dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ class CrondMonitorKwargs:

bk_cloud_id: int
exec_ips: list
port: int
name: str = ""
port: int = 0
minutes: int = 1440
enable: bool = False

Expand Down

0 comments on commit 7622434

Please sign in to comment.