Skip to content

Commit

Permalink
fix: 定点回档实例安装接入子流程 从库重建主从迁移bug修复 #3747
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo authored and zhangzhw8 committed Apr 8, 2024
1 parent 7570540 commit c30649a
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ def deploy_migrate_cluster_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)
bk_host_ids = []
if "bk_new_slave" in self.data.keys():
bk_host_ids.append(self.data["bk_new_slave"]["bk_host_id"])
if "bk_new_master" in self.data.keys():
bk_host_ids.append(self.data["bk_new_master"]["bk_host_id"])

tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
# 整机安装数据库
install_sub_pipeline_list = []
Expand All @@ -125,6 +131,7 @@ def deploy_migrate_cluster_flow(self):
cluster=cluster_class,
new_mysql_list=[self.data["new_slave_ip"], self.data["new_master_ip"]],
install_ports=self.data["ports"],
bk_host_ids=bk_host_ids,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ def migrate_cluster_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)
bk_host_ids = []
if "bk_new_slave" in self.data.keys():
bk_host_ids.append(self.data["bk_new_slave"]["bk_host_id"])
if "bk_new_master" in self.data.keys():
bk_host_ids.append(self.data["bk_new_master"]["bk_host_id"])

tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
# 整机安装数据库
install_sub_pipeline_list = []
Expand All @@ -128,6 +134,7 @@ def migrate_cluster_flow(self):
cluster=cluster_class,
new_mysql_list=[self.data["new_slave_ip"], self.data["new_master_ip"]],
install_ports=self.data["ports"],
bk_host_ids=bk_host_ids,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ def deploy_restore_slave_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)
bk_host_ids = []
if "bk_new_slave" in self.data.keys():
bk_host_ids.append(self.data["bk_new_slave"]["bk_host_id"])

tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
# 获取信息
# 整机安装数据库
Expand All @@ -124,6 +128,7 @@ def deploy_restore_slave_flow(self):
cluster=cluster_class,
new_mysql_list=[self.data["new_slave_ip"]],
install_ports=self.data["ports"],
bk_host_ids=bk_host_ids,
)
)

Expand Down Expand Up @@ -211,6 +216,18 @@ def deploy_restore_slave_flow(self):
ins_list=inst_list,
)
)
# 恢复完毕的时候 slave 状态改为running
sync_data_sub_pipeline.add_act(
act_name=_("同步数据完毕,写入数据节点的主从关系相关元数据,设置新节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__,
cluster=cluster,
is_update_trans_data=True,
)
),
)
sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))

switch_sub_pipeline_list = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def tendb_ha_restore_slave_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)
bk_host_ids = []
if "bk_new_slave" in self.data.keys():
bk_host_ids.append(self.data["bk_new_slave"]["bk_host_id"])
tendb_migrate_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
# 获取信息
# 整机安装数据库
Expand All @@ -122,6 +125,7 @@ def tendb_ha_restore_slave_flow(self):
cluster=cluster_class,
new_mysql_list=[self.data["new_slave_ip"]],
install_ports=self.data["ports"],
bk_host_ids=bk_host_ids,
)
)

Expand Down Expand Up @@ -193,7 +197,7 @@ def tendb_ha_restore_slave_flow(self):
)
)
sync_data_sub_pipeline.add_act(
act_name=_("同步数据完毕,写入数据节点的主从关系相关元数据"),
act_name=_("同步数据完毕,写入数据节点的主从关系相关元数据,设置新节点为running状态"),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import copy
import logging.config
from dataclasses import asdict
from datetime import datetime
from typing import Dict, Optional

from django.utils.crypto import get_random_string
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
Expand All @@ -22,17 +24,14 @@
from backend.db_package.models import Package
from backend.flow.consts import InstanceStatus, MediumEnum, RollbackType
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_surrounding_apps_sub_flow,
install_mysql_in_cluster_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_rollback_data_sub_flow
from backend.flow.engine.bamboo.scene.mysql.mysql_rollback_data_sub_flow import (
rollback_local_and_backupid,
rollback_remote_and_backupid,
rollback_remote_and_time,
)
from backend.flow.engine.bamboo.scene.mysql.mysql_single_apply_flow import MySQLSingleApplyFlow
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
Expand All @@ -42,6 +41,7 @@
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.ticket.constants import TicketType

logger = logging.getLogger("flow")

Expand Down Expand Up @@ -85,6 +85,7 @@ def rollback_data_flow(self):
self.data["ticket_type"] = self.ticket_data["ticket_type"]
self.data["cluster_type"] = cluster_class.cluster_type
self.data["uid"] = self.ticket_data["uid"]
self.data["city"] = info["city"]
self.data["package"] = Package.get_latest_package(
version=cluster_class.major_version, pkg_type=MediumEnum.MySQL, db_type=DBType.MySQL
).name
Expand All @@ -95,59 +96,26 @@ def rollback_data_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)

# todo 之前安装流程整改为调用统一的单节点安装流程
sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
install_ticket = copy.deepcopy(self.data)
datetime_str = datetime.strftime(datetime.now(), "%Y%m%d%H%M%S%f")
cluster_name = "{}-{}".format(cluster_class.name, datetime_str)
if len(cluster_name) > 48:
cluster_name = get_random_string(24)
master_domain = "tmpdb.{}.dba.db".format(cluster_name)
install_ticket["start_mysql_port"] = master.port
install_ticket["inst_num"] = 1
install_ticket["ticket_type"] = TicketType.MYSQL_SINGLE_APPLY.value
install_ticket["resource_spec"] = {}
# install_ticket["resource_spec"] = {"single": {id: 2, "xxx": "xxx"}}
install_ticket["apply_infos"] = [
{"new_ip": self.data["bk_rollback"], "clusters": [{"name": cluster_name, "master": master_domain}]}
]
sub_pipeline.add_sub_pipeline(
sub_flow=install_mysql_in_cluster_sub_flow(
uid=self.ticket_data["uid"],
root_id=self.root_id,
cluster=cluster_class,
new_mysql_list=[self.data["rollback_ip"]],
install_ports=[master.port],
)
)
cluster = {
"install_ip": self.data["rollback_ip"],
"cluster_ids": [cluster_class.id],
"package": self.data["package"],
}
sub_pipeline.add_act(
act_name=_("写入初始化实例的db_meta元信息"),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
db_meta_class_func=MySQLDBMeta.slave_recover_add_instance.__name__,
cluster=copy.deepcopy(cluster),
is_update_trans_data=False,
)
),
)
sub_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_class.bk_cloud_id,
master_ip_list=None,
slave_ip_list=[self.data["rollback_ip"]],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBHA.value,
is_install_backup=False,
is_install_monitor=False,
)
MySQLSingleApplyFlow(root_id=self.root_id, data=install_ticket).deploy_mysql_single_flow()
)

exec_act_kwargs = ExecActuatorKwargs(
cluster=cluster,
bk_cloud_id=cluster_class.bk_cloud_id,
cluster_type=cluster_class.cluster_type,
get_mysql_payload_func=MysqlActPayload.get_install_tmp_db_backup_payload.__name__,
)
exec_act_kwargs.exec_ip = [self.data["rollback_ip"]]
sub_pipeline.add_act(
act_name=_("安装临时备份程序"),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
mycluster = {
"name": cluster_class.name,
"cluster_id": cluster_class.id,
Expand Down
18 changes: 11 additions & 7 deletions dbm-ui/backend/flow/utils/mysql/mysql_db_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,13 +876,17 @@ def migrate_cluster_add_tuple(self):
bk_cloud_id=self.cluster["bk_cloud_id"],
port_list=[self.cluster["master_port"]],
)
# 安装时候已经写入
# api.cluster.tendbha.storage_tuple.add_storage_tuple(
# master_ip=self.cluster["new_master_ip"],
# slave_ip=self.cluster["new_slave_ip"],
# bk_cloud_id=self.cluster["bk_cloud_id"],
# port_list=[self.cluster["master_port"]],
# )
# 数据恢复完毕,修改新实例为running状态
new_master_storage = StorageInstance.objects.get(
machine__ip=self.cluster["new_master_ip"], machine__bk_cloud_id=self.cluster["bk_cloud_id"]
)
new_master_storage.status = InstanceStatus.RUNNING.value
new_master_storage.save()
new_slave_storage = StorageInstance.objects.get(
machine__ip=self.cluster["new_slave_ip"], machine__bk_cloud_id=self.cluster["bk_cloud_id"]
)
new_slave_storage.status = InstanceStatus.RUNNING.value
new_slave_storage.save()

def uninstall_instance(self):
"""
Expand Down

0 comments on commit c30649a

Please sign in to comment.