Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style(mysql): 重建流程代码抽象优化 #6973 #6983

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def switch_storage(cluster_id: int, target_storage_ip: str, origin_storage_ip: s
)
cluster.storageinstance_set.remove(origin_storage)
target_storage.status = InstanceStatus.RUNNING.value
if role and target_storage == InstanceRole.BACKEND_REPEATER:
if role and target_storage.instance_role == InstanceRole.BACKEND_REPEATER.value:
# 如果是REPEATER角色,改成传入的role变量
target_storage.instance_role = role
target_storage.instance_inner_role = InstanceRoleInstanceInnerRoleMap[role].value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@ def slave_migrate_switch_sub_flow(
)
),
)
return sub_pipeline.build_sub_process(sub_name=_("[{}]成对切换".format(cluster.name)))
return sub_pipeline.build_sub_process(sub_name=_("{}切换到新从库{}:{}".format(cluster.name, new_slave_ip, master.port)))
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from backend.components import DBConfigApi
from backend.components.dbconfig.constants import FormatType, LevelName
from backend.configuration.constants import DBType
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceInnerRole, InstanceStatus
from backend.db_meta.models import Cluster
from backend.db_package.models import Package
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
Expand All @@ -33,6 +34,9 @@
)
from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config
from backend.flow.engine.bamboo.scene.mysql.common.master_and_slave_switch import master_and_slave_switch
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import (
mysql_restore_master_slave_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow
from backend.flow.engine.bamboo.scene.spider.common.exceptions import TendbGetBackupInfoFailedException
from backend.flow.engine.bamboo.scene.spider.spider_remote_node_migrate import remote_instance_migrate_sub_flow
Expand All @@ -53,6 +57,7 @@
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.ticket.builders.common.constants import MySQLBackupSource

logger = logging.getLogger("flow")

Expand All @@ -74,6 +79,9 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]):

# 定义备份文件存放到目标机器目录位置
self.backup_target_path = f"/data/dbbak/{self.root_id}"
self.local_backup = False
if self.ticket_data.get("backup_source") == MySQLBackupSource.LOCAL:
self.local_backup = True

def migrate_cluster_flow(self, use_for_upgrade=False):
"""
Expand Down Expand Up @@ -234,18 +242,42 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
cluster["slave_ip"] = self.data["slave_ip"]
cluster["master_port"] = master_model.port
cluster["slave_port"] = master_model.port
cluster["mysql_port"] = master_model.port
cluster["file_target_path"] = f"/data/dbbak/{self.root_id}/{master_model.port}"
cluster["cluster_id"] = cluster_model.id
cluster["bk_cloud_id"] = cluster_model.bk_cloud_id
cluster["change_master_force"] = False
cluster["change_master"] = False
cluster["charset"] = self.data["charset"]

sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=remote_instance_migrate_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
if self.local_backup:
stand_by_slaves = cluster_model.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[self.data["new_slave_ip"], self.data["new_master_ip"]])
# 从standby从库找备份
inst_list = ["{}{}{}".format(master_model.machine.ip, IP_PORT_DIVIDER, master_model.port)]
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_master_slave_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
)
else:
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=remote_instance_migrate_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)
)

sync_data_sub_pipeline.add_act(
act_name=_("数据恢复完毕,写入新主节点和旧主节点的关系链元数据"),
Expand Down Expand Up @@ -297,7 +329,7 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
)
)
switch_sub_pipeline.add_act(
act_name=_("集群切换完成,写入 {} 的元信息".format(cluster_model.id)),
act_name=_("集群切换完成,写入 {} 的元信息".format(cluster_model.name)),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
Expand All @@ -308,7 +340,7 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
),
)
switch_sub_pipeline_list.append(
switch_sub_pipeline.build_sub_process(sub_name=_("集群 {} 切换".format(cluster_model.id)))
switch_sub_pipeline.build_sub_process(sub_name=_("集群 {} 切换".format(cluster_model.name)))
)
# 第四步 卸载实例
uninstall_svr_sub_pipeline_list = []
Expand Down Expand Up @@ -405,6 +437,18 @@ def migrate_cluster_flow(self, use_for_upgrade=False):
db_backup_pkg_type=MysqlVersionToDBBackupForMap[self.data["db_version"]],
)
)
# tendb_migrate_pipeline.add_act(
# act_name=_("屏蔽监控 {} {}").format(self.data["new_master_ip"], self.data["new_slave_ip"]),
# act_component_code=MysqlCrondMonitorControlComponent.code,
# kwargs=asdict(
# CrondMonitorKwargs(
# bk_cloud_id=cluster_class.bk_cloud_id,
# exec_ips=[self.data["new_master_ip"], self.data["new_slave_ip"]],
# port=0,
# minutes=240,
# )
# ),
# )
# 人工确认切换迁移实例
tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={})
# 切换迁移实例
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
install_mysql_in_cluster_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow
Expand Down Expand Up @@ -57,6 +58,7 @@
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.ticket.builders.common.constants import MySQLBackupSource

logger = logging.getLogger("flow")

Expand All @@ -76,6 +78,9 @@ def __init__(self, root_id: str, tick_data: Optional[Dict]):
self.data = {}
# 仅添加从库。不切换。不复制账号
self.add_slave_only = self.ticket_data.get("add_slave_only", False)
self.local_backup = False
if self.ticket_data.get("backup_source") == MySQLBackupSource.LOCAL:
self.local_backup = True

def tendb_ha_restore_slave_flow(self):
"""
Expand Down Expand Up @@ -200,13 +205,38 @@ def tendb_ha_restore_slave_flow(self):
"file_target_path": f"/data/dbbak/{self.root_id}/{master.port}",
"charset": self.data["charset"],
"change_master_force": True,
"change_master": True,
}
sync_data_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
if self.local_backup:
# 获取本地备份并恢复
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
stand_by_slaves = cluster_model.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[self.data["new_slave_ip"]])
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_data_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
)
)

else:
sync_data_sub_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)

sync_data_sub_pipeline.add_act(
act_name=_("同步完毕,写入主从关系,设置节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
Expand All @@ -218,6 +248,7 @@ def tendb_ha_restore_slave_flow(self):
)
),
)

sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))

switch_sub_pipeline_list = []
Expand Down Expand Up @@ -319,6 +350,19 @@ def tendb_ha_restore_slave_flow(self):
cluster_type=ClusterType.TenDBHA.value,
)
)
# tendb_migrate_pipeline.add_act(
# act_name=_("屏蔽监控 {}").format(self.data["new_slave_ip"]),
# act_component_code=MysqlCrondMonitorControlComponent.code,
# kwargs=asdict(
# CrondMonitorKwargs(
# bk_cloud_id=cluster_class.bk_cloud_id,
# exec_ips=[self.data["new_slave_ip"]],
# port=0,
# minutes=240,
# )
# ),
# )

if not self.add_slave_only:
# 人工确认切换迁移实例
tendb_migrate_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={})
Expand Down Expand Up @@ -473,12 +517,27 @@ def restore_local_slave_flow(self):
"charset": self.data["charset"],
"change_master_force": True,
"cluster_type": cluster_model.cluster_type,
"change_master": True,
}
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster

if self.local_backup:
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=mysql_restore_data_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=cluster,
cluster_model=cluster_model,
ins_list=inst_list,
)
)
)
else:
tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=slave_recover_sub_flow(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)

# 卸载流程人工确认
tendb_migrate_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})
# 克隆权限
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ def _execute(self, data, parent_data) -> bool:
self.log_info("{} exec {}".format(target_ip_info, node_name))

FlowNode.objects.filter(root_id=root_id, node_id=node_id).update(hosts=exec_ips)
cmd_str = "cd /home/mysql/mysql-crond && ./mysql-crond "
if kwargs["enable"]:
monitor_command = (
"cd /home/mysql/mysql-crond && ./mysql-crond enable-job --name-match mysql-monitor-{}-.*".format(
kwargs["port"]
)
)
cmd_str += " enable-job"
else:
monitor_command = (
"cd /home/mysql/mysql-crond && ./mysql-crond pause-job --name-match mysql-monitor-{}-.* -r {}m".format(
kwargs["port"], kwargs["minutes"]
)
)
cmd_str += " pause-job -r {}m".format(kwargs["minutes"])
if kwargs["port"] == 0:
cmd_str += " --name-match mysql-monitor-.*"
else:
cmd_str += " --name-match mysql-monitor-{}-.*".format(kwargs["port"])
if kwargs["name"] != "":
cmd_str += " --name {}".format(kwargs["name"])
self.log_info(cmd_str)
body = {
"bk_biz_id": env.JOB_BLUEKING_BIZ_ID,
"task_name": f"DBM_{node_name}_{node_id}",
"script_content": base64_encode(monitor_command),
"script_content": base64_encode(cmd_str),
"script_language": 1,
"target_server": {"ip_list": target_ip_info},
}
Expand Down
3 changes: 2 additions & 1 deletion dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ class CrondMonitorKwargs:

bk_cloud_id: int
exec_ips: list
port: int
name: str = ""
port: int = 0
minutes: int = 1440
enable: bool = False

Expand Down
8 changes: 1 addition & 7 deletions dbm-ui/backend/ticket/builders/mysql/mysql_add_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,7 @@ def validate(self, attrs):

class MysqlAddSlaveParamBuilder(builders.FlowParamBuilder):
# 复用重建 slave 的场景
controller_remote = MySQLController.mysql_add_slave_remote_scene
controller_local = MySQLController.mysql_add_slave_scene

def build_controller_info(self) -> dict:
backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL)
self.controller = getattr(self, f"controller_{backup_source}")
return super().build_controller_info()
controller = MySQLController.mysql_add_slave_remote_scene

def format_ticket_data(self):
self.ticket_data["add_slave_only"] = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,7 @@ def validate(self, attrs):


class MysqlMigrateClusterParamBuilder(builders.FlowParamBuilder):
controller_remote = MySQLController.mysql_migrate_remote_scene
controller_local = MySQLController.mysql_migrate_cluster_scene

def build_controller_info(self) -> dict:
backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL)
self.controller = getattr(self, f"controller_{backup_source}")
return super().build_controller_info()
controller = MySQLController.mysql_migrate_remote_scene

def format_ticket_data(self):
if self.ticket_data["ip_source"] == IpSource.RESOURCE_POOL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ def validate(self, attrs):


class MysqlRestoreLocalSlaveParamBuilder(builders.FlowParamBuilder):
controller_remote = MySQLController.mysql_restore_local_remote_scene
controller_local = MySQLController.mysql_restore_local_slave_scene

def build_controller_info(self) -> dict:
backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL)
self.controller = getattr(self, f"controller_{backup_source}")
return super().build_controller_info()
controller = MySQLController.mysql_restore_local_remote_scene

def format_ticket_data(self):
for index, info in enumerate(self.ticket_data["infos"]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,7 @@ def validate(self, attrs):


class MysqlRestoreSlaveParamBuilder(builders.FlowParamBuilder):
controller_remote = MySQLController.mysql_restore_slave_remote_scene
controller_local = MySQLController.mysql_restore_slave_scene

def build_controller_info(self) -> dict:
backup_source = self.ticket_data.get("backup_source", MySQLBackupSource.LOCAL)
self.controller = getattr(self, f"controller_{backup_source}")
return super().build_controller_info()
controller = MySQLController.mysql_restore_slave_remote_scene

def format_ticket_data(self):
self.ticket_data["add_slave_only"] = False
Expand Down
Loading