diff --git a/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/check_mssql_service.go b/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/check_mssql_service.go new file mode 100644 index 0000000000..1eaf0056a2 --- /dev/null +++ b/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/check_mssql_service.go @@ -0,0 +1,83 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package checkcmd + +import ( + "fmt" + + "dbm-services/common/go-pubpkg/logger" + "dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd" + "dbm-services/sqlserver/db-tools/dbactuator/pkg/components/check" + "dbm-services/sqlserver/db-tools/dbactuator/pkg/util" + + "github.com/spf13/cobra" +) + +// MssqlServiceAct sqlserver 检查实例连接情况 +type MssqlServiceAct struct { + *subcmd.BaseOptions + BaseService check.MssqlServiceComp +} + +// MssqlServiceCommand godoc +// +// @Summary sqlserver 检查实例连接情况 +// @Description - +// @Tags sqlserver +// @Accept json +// @Param body body MssqlServiceComp true "short description" +func MssqlServiceCommand() *cobra.Command { + act := MssqlServiceAct{ + BaseOptions: subcmd.GBaseOptions, + } + cmd := &cobra.Command{ + Use: "MssqlServiceCheck", + Short: "检查机器注册Sqlserver进程情况", + Example: fmt.Sprintf(`dbactuator check MssqlServiceCheck %s `, subcmd.CmdBaseExampleStr), + Run: func(cmd *cobra.Command, args []string) { + util.CheckErr(act.Validate()) + if act.RollBack { + return + } + util.CheckErr(act.Init()) + util.CheckErr(act.Run()) + }, + } + return cmd +} + +// Init 初始化 +func (u *MssqlServiceAct) Init() (err error) { + logger.Info("MssqlServiceAct Init") + if err = u.Deserialize(&u.BaseService.Params); err != nil { + logger.Error("DeserializeAndValidate failed, %v", err) + return err + } + u.BaseService.GeneralParam = subcmd.GeneralRuntimeParam + + return nil +} + +// Run 执行 +func (u *MssqlServiceAct) Run() (err error) { + steps := subcmd.Steps{ + { + FunName: "检查机器注册Sqlserver进程情况", + Func: u.BaseService.CheckMssqlService, + }, + } + + if err := steps.Run(); err != nil { + return err + } + logger.Info("check-inst-processlists successfully") + return nil +} diff --git a/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/checkcmd.go b/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/checkcmd.go index e90c1320b2..a58be6219d 100644 --- a/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/checkcmd.go +++ b/dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd/checkcmd/checkcmd.go @@ -31,6 +31,7 @@ func CheckCommand() *cobra.Command { Commands: []*cobra.Command{ CheckAbnormalDBCommand(), CheckInstProcessCommand(), + MssqlServiceCommand(), }, }, } diff --git a/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/check/check_mssql_service.go b/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/check/check_mssql_service.go new file mode 100644 index 0000000000..c96cd9294e --- /dev/null +++ b/dbm-services/sqlserver/db-tools/dbactuator/pkg/components/check/check_mssql_service.go @@ -0,0 +1,53 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package check + +import ( + "bk-dbconfig/pkg/core/logger" + "fmt" + + "dbm-services/sqlserver/db-tools/dbactuator/pkg/components" + "dbm-services/sqlserver/db-tools/dbactuator/pkg/util/osutil" + "dbm-services/sqlserver/db-tools/dbactuator/pkg/util/sqlserver" +) + +// MssqlServiceComp 检查db连接情况 +type MssqlServiceComp struct { + GeneralParam *components.GeneralParam + Params *MssqlServiceParam + DB *sqlserver.DbWorker +} + +// MssqlServiceParam 参数 +type MssqlServiceParam struct { + Host string `json:"host" validate:"ip" ` // 本地hostip +} + +// CheckMssqlService 检查机器注册Sqlserver进程情况 +func (c *MssqlServiceComp) CheckMssqlService() error { + var checkresult string + ret, err := osutil.StandardPowerShellCommand( + "GET-SERVICE -NAME MSSQL* | WHERE-OBJECT {$_.NAME -NOTLIKE \"*#*\"}", + ) + if err != nil { + return err + } + if ret != "" { + // 输出不为空则表示有部署进程 + logger.Info("there is a mssql process has been registered [%s]", osutil.CleanExecOutput(ret)) + checkresult = "1" + } + logger.Info("no mssql service registered") + checkresult = "0" + components.WrapperOutputString(fmt.Sprintf("{\"checkresult\": \"%s\"}", checkresult)) + return nil + +} diff --git a/dbm-ui/backend/env/__init__.py b/dbm-ui/backend/env/__init__.py index 870968c197..07cd33f1f0 100644 --- a/dbm-ui/backend/env/__init__.py +++ b/dbm-ui/backend/env/__init__.py @@ -120,6 +120,8 @@ SA_L5_AGENT_TEMPLATE_ID = get_type_env(key="SA_L5_AGENT_TEMPLATE_ID", _type=int) # 标准运维项目 ID BK_SOPS_PROJECT_ID = get_type_env(key="BK_SOPS_PROJECT_ID", _type=int, default=1) +# 标准运维更新window机器的模板ID +UPDATE_WINDOW_GSE_CONFIG = get_type_env(key="UPDATE_WINDOW_GSE_CONFIG", _type=int, default=1) # Bamboo ENABLE_CLEAN_EXPIRED_BAMBOO_TASK = get_type_env(key="ENABLE_CLEAN_EXPIRED_BAMBOO_TASK", _type=bool, default=False) diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index d07b40b755..4e198b3a8d 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -657,6 +657,7 @@ class SqlserverActuatorActionEnum(str, StructuredEnum): ClearConfig = EnumField("ClearConfig", _("清理实例周边配置。目前支持清理job、linkserver")) RemoteDr = EnumField("RemoteDr", _("将一些dr移除可用组")) Init = EnumField("init", _("部署后需要初始化实例的步骤")) + MssqlServiceCheck = EnumField("MssqlServiceCheck", _("检测进程是否注册")) class DorisActuatorActionEnum(str, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/common/builder.py b/dbm-ui/backend/flow/engine/bamboo/scene/common/builder.py index 959a6fd421..514b3d8e22 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/common/builder.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/common/builder.py @@ -10,14 +10,17 @@ """ import copy import logging -from typing import Any, Dict, Optional +from dataclasses import dataclass +from typing import Any, Dict, List, Optional from bamboo_engine import api, builder from bamboo_engine.builder import ( + ConditionalParallelGateway, ConvergeGateway, Data, EmptyEndEvent, EmptyStartEvent, + NodeOutput, ParallelGateway, Params, RewritableNodeOutput, @@ -33,11 +36,18 @@ from backend.flow.models import FlowNode, FlowTree, StateType from backend.flow.plugins.components.collections.common.create_random_job_user import AddTempUserForClusterComponent from backend.flow.plugins.components.collections.common.drop_random_job_user import DropTempUserForClusterComponent +from backend.flow.plugins.components.collections.common.empty_node import EmptyNodeComponent from backend.ticket.constants import TicketType logger = logging.getLogger("json") +@dataclass +class Conditions: + act_object: Any + express: str + + class Builder(object): """ 构建bamboo流程的抽象类,解决开发人员在编排流程的学习成本,减少代码重复率 @@ -77,6 +87,9 @@ def __init__(self, root_id: str, data: Optional[Dict] = None, need_random_pass_c # 定义流程数据上下文参数trans_data self.rewritable_node_source_keys = [] + # 定义条件网关的上下文参数 + self.node_output_list = [] + # 判断是否添加临时账号的流程逻辑 if self.need_random_pass_cluster_ids: self.create_random_pass_act() @@ -203,6 +216,35 @@ def add_parallel_sub_pipeline(self, sub_flow_list: list): cg = ConvergeGateway() self.pipe = self.pipe.extend(pg).connect(*sub_flow_list).to(pg).converge(cg) + def add_conditional_subs(self, source_act, conditions: List[Conditions], name: str, conditions_param: str): + """ + add_conditional_subs:给流程添加条件分支节点,控制执行节点或者子流程 + @param source_act: 控制添加源节点 + @param conditions: 表达式 + @param name: 表达式名称 + @param conditions_param: 表达式变量名称 + """ + real_conditions = {} + connect_list = [] + for index, info in enumerate(conditions): + real_conditions[index] = f"${{{conditions_param}}} {info.express}" + connect_list.append(info.act_object) + + # 添你默认节点 + connect_list.append( + self.add_act(act_name="default_node", act_component_code=EmptyNodeComponent.code, kwargs={}, extend=False) + ) + real_conditions[len(connect_list) - 1] = "1==1" + + cpg = ConditionalParallelGateway( + conditions=real_conditions, name=name, default_condition_outgoing=len(connect_list) - 1 + ) + cg = ConvergeGateway() + self.pipe = self.pipe.extend(source_act).extend(cpg).connect(*connect_list).to(cpg).converge(cg) + + # 拼接有可能条件网关需要的上下文变量 + self.node_output_list.append({"conditions_param": conditions_param, "source_act_id": source_act.id}) + def run_pipeline(self, init_trans_data_class: Optional[Any] = None, is_drop_random_user: bool = True) -> bool: """ 开始运行 pipeline @@ -218,6 +260,11 @@ def run_pipeline(self, init_trans_data_class: Optional[Any] = None, is_drop_rand self.global_data.inputs["${trans_data}"] = RewritableNode( source_act=self.rewritable_node_source_keys, type=Var.SPLICE, value=init_trans_data_class ) + # 声明NodeOutput变量 + for i in self.node_output_list: + self.global_data.inputs[f"${{{i['conditions_param']}}}"] = NodeOutput( + type=Var.SPLICE, source_act=i["source_act_id"], source_key=f"{i['conditions_param']}" + ) self.pipe.extend(self.end_act) pipeline = builder.build_tree(self.start_act, id=self.root_id, data=self.global_data) pipeline_copy = copy.deepcopy(pipeline) @@ -272,7 +319,11 @@ def build_sub_process(self, sub_name) -> Optional[SubProcess]: sub_data.inputs["${trans_data}"] = RewritableNode( source_act=self.rewritable_node_source_keys, type=Var.SPLICE, value=None ) - # sub_data.inputs['${trans_data}'] = DataInput(type=Var.SPLICE, value='${trans_data}') + # 声明NodeOutput变量 + for i in self.node_output_list: + sub_data.inputs[f"${{{i['conditions_param']}}}"] = NodeOutput( + type=Var.SPLICE, source_act=i["source_act_id"], source_key=f"{i['conditions_param']}" + ) sub_params = Params({"${trans_data}": Var(type=Var.SPLICE, value="${trans_data}")}) self.pipe.extend(self.end_act) return SubProcess(start=self.start_act, data=sub_data, params=sub_params, name=sub_name) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/common_sub_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/common_sub_flow.py index c48eef0fab..bd4cd99163 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/common_sub_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/common_sub_flow.py @@ -54,6 +54,9 @@ SqlserverDownloadBackupFileComponent, ) from backend.flow.plugins.components.collections.sqlserver.trans_files import TransFileInWindowsComponent +from backend.flow.plugins.components.collections.sqlserver.update_window_gse_config import ( + UpdateWindowGseConfigComponent, +) from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs, InstallNodemanPluginKwargs from backend.flow.utils.mysql.mysql_act_dataclass import InitCheckKwargs, UpdateDnsRecordKwargs from backend.flow.utils.sqlserver.sqlserver_act_dataclass import ( @@ -65,6 +68,7 @@ P2PFileForWindowKwargs, RestoreForDoDrKwargs, SqlserverBackupIDContext, + UpdateWindowGseConfigKwargs, ) from backend.flow.utils.sqlserver.sqlserver_act_payload import SqlserverActPayload from backend.flow.utils.sqlserver.sqlserver_db_function import get_backup_path @@ -137,6 +141,19 @@ def install_sqlserver_sub_flow( ) sub_pipeline.add_parallel_acts(acts_list=acts_list) + # 更新window机器的gse配置信息 + if env.UPDATE_WINDOW_GSE_CONFIG: + acts_list = [] + for host in target_hosts: + acts_list.append( + { + "act_name": _("更新gse配置信息[{}]".format(host.ip)), + "act_component_code": UpdateWindowGseConfigComponent.code, + "kwargs": asdict(UpdateWindowGseConfigKwargs(ips=[host.ip], bk_cloud_id=bk_cloud_id)), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + # 安装蓝鲸插件 acts_list = [] for plugin_name in DEPENDENCIES_PLUGINS: @@ -435,6 +452,8 @@ def sync_dbs_for_cluster_sub_flow( sync_dbs: list, clean_dbs: list = None, sub_flow_name: str = _("建立数据库同步子流程"), + is_recalc_sync_dbs: bool = False, + is_recalc_clean_dbs: bool = False, ): """ 数据库建立同步的子流程 @@ -445,6 +464,8 @@ def sync_dbs_for_cluster_sub_flow( @param sync_dbs: 待同步的db列表 @param clean_dbs: 这次清理的db列表,默认为空,则用sync_dbs列表作为清理db @param sub_flow_name: 子流程名称 + @param is_recalc_sync_dbs: 控制在流程运行是否在传输上下文获取sync_dbs,适配于原地重建slave场景 + @param is_recalc_clean_dbs: 控制在流程运行是否在传输上下文获取clean_dbs,适配于原地重建slave场景 """ # 获取当前master实例信息 master_instance = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) @@ -463,7 +484,7 @@ def sync_dbs_for_cluster_sub_flow( SqlserverSyncMode.ALWAYS_ON: SqlserverActPayload.get_build_add_dbs_in_always_on.__name__, } # 判断必要参数 - if len(sync_slaves) == 0 or len(sync_dbs) == 0: + if len(sync_slaves) == 0 or (len(sync_dbs) == 0 and is_recalc_sync_dbs is False): raise Exception("sync_slaves or sync_dbs is null, check") # 做判断, cluster_sync_mode 如果是mirror,原则上不允许一主多从的架构, 所以判断传入的slave是否有多个 @@ -483,6 +504,8 @@ def sync_dbs_for_cluster_sub_flow( "ignore_clean_tables": [], "sync_mode": SqlserverSyncModeMaps[cluster_sync_mode], "slaves": [], + "is_recalc_sync_dbs": is_recalc_sync_dbs, # 判断标志位待入到全局上下文,获取payload进行判断 + "is_recalc_clean_dbs": is_recalc_clean_dbs, # 判断标志位待入到全局上下文,获取payload进行判断 } # 声明子流程 diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_slave_rebuild.py b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_slave_rebuild.py index 7a8493f257..3b4163c4a2 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_slave_rebuild.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_slave_rebuild.py @@ -20,7 +20,7 @@ from backend.db_meta.models import Cluster, StorageInstance from backend.db_meta.models.storage_set_dtl import SqlserverClusterSyncMode from backend.flow.consts import SqlserverCleanMode, SqlserverLoginExecMode, SqlserverSyncMode, SqlserverSyncModeMaps -from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.builder import Builder, Conditions, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.sqlserver.base_flow import BaseFlow from backend.flow.engine.bamboo.scene.sqlserver.common_sub_flow import ( @@ -34,6 +34,7 @@ from backend.flow.plugins.components.collections.common.delete_cc_service_instance import DelCCServiceInstComponent from backend.flow.plugins.components.collections.common.pause import PauseComponent from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent +from backend.flow.plugins.components.collections.sqlserver.check_slave_sync_status import CheckSlaveSyncStatusComponent from backend.flow.plugins.components.collections.sqlserver.create_random_job_user import SqlserverAddJobUserComponent from backend.flow.plugins.components.collections.sqlserver.drop_random_job_user import SqlserverDropJobUserComponent from backend.flow.plugins.components.collections.sqlserver.exec_actuator_script import SqlserverActuatorScriptComponent @@ -47,6 +48,7 @@ IpDnsRecordRecycleKwargs, ) from backend.flow.utils.sqlserver.sqlserver_act_dataclass import ( + CheckSlaveSyncStatusKwargs, CreateRandomJobUserKwargs, DBMetaOPKwargs, DownloadMediaKwargs, @@ -54,15 +56,13 @@ ExecActuatorKwargs, ExecLoginKwargs, SqlserverBackupIDContext, + SqlserverRebuildSlaveContext, ) from backend.flow.utils.sqlserver.sqlserver_act_payload import SqlserverActPayload from backend.flow.utils.sqlserver.sqlserver_db_function import ( - check_always_on_status, create_sqlserver_login_sid, get_dbs_for_drs, get_group_name, - get_no_sync_dbs, - get_restoring_dbs, get_sync_filter_dbs, ) from backend.flow.utils.sqlserver.sqlserver_db_meta import SqlserverDBMeta @@ -85,8 +85,13 @@ def slave_rebuild_in_local_flow(self): """ 原地重建子流程 流程逻辑: - 1: 清理slave实例的所有库 - 2: 建立数据库级别主从关系 + 1: 先判断slave处于什么级别的异常状态,这里分成4个等级,不同等级对应不同的流程过程 + 2: 根据CheckSlaveSyncStatusComponent类执行返回结果,通过条件分支网关处理不同流程 + 2.1: 如果集群尚未不部署Alwayson配置,且元数据记录是Alwayson同步类型,则走修复1:添加Alwayson可用组修复+数据同步修复 + 2.2: 如果待修复的slave可用组异常,则走修复2:重建slave可用组修复+数据同步修复 + 2.3: 如果集群中部分数据库尚未建立同步,则走修复3:部分数据库同步修复 + 2.4: 如果集群数据库同步正常,且待修复slave正常,则走修复4,默认不处理。 + """ # 定义主流程 @@ -96,9 +101,7 @@ def slave_rebuild_in_local_flow(self): for info in self.data["infos"]: cluster = Cluster.objects.get(id=info["cluster_id"]) master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) - cluster_sync_mode = SqlserverClusterSyncMode.objects.get(cluster_id=cluster.id).sync_mode rebuild_slave = cluster.storageinstance_set.get(machine__ip=info["slave_host"]["ip"]) - sync_dbs = get_no_sync_dbs(cluster_id=cluster.id) # 拼接子流程全局上下文 sub_flow_context = copy.deepcopy(self.data) @@ -108,9 +111,6 @@ def slave_rebuild_in_local_flow(self): sub_flow_context["sync_mode"] = SqlserverSyncModeMaps[ SqlserverClusterSyncMode.objects.get(cluster_id=cluster.id).sync_mode ] - sub_flow_context["clean_dbs"] = list( - set(sync_dbs) | set(get_restoring_dbs(rebuild_slave, cluster.bk_cloud_id)) - ) sub_flow_context["clean_mode"] = SqlserverCleanMode.DROP_DBS.value sub_flow_context["clean_tables"] = ["*"] sub_flow_context["ignore_clean_tables"] = [] @@ -129,45 +129,52 @@ def slave_rebuild_in_local_flow(self): ), ), ) - - if cluster_sync_mode == SqlserverSyncMode.ALWAYS_ON and not check_always_on_status(cluster, rebuild_slave): - # 表示这个从实例和可用组端口连接,需要重建可用组关系 - sub_pipeline.add_act( - act_name=_("[{}]重建可用组".format(info["slave_host"]["ip"])), - act_component_code=SqlserverActuatorScriptComponent.code, - kwargs=asdict( - ExecActuatorKwargs( - exec_ips=[Host(ip=master.machine.ip, bk_cloud_id=cluster.bk_cloud_id)], - get_payload_func=SqlserverActPayload.get_build_always_on.__name__, - custom_params={ - "port": master.port, - "add_slaves": [{"host": rebuild_slave.machine.ip, "port": rebuild_slave.port}], - "group_name": get_group_name(master_instance=master, bk_cloud_id=cluster.bk_cloud_id), - "is_first": False, - "is_use_sa": False, - }, - ) + source_act = sub_pipeline.add_act( + act_name=_("检测带重建slave状态[{}]".format(rebuild_slave.ip_port)), + act_component_code=CheckSlaveSyncStatusComponent.code, + kwargs=asdict( + CheckSlaveSyncStatusKwargs(cluster_id=cluster.id, fix_slave_host=info["slave_host"]["ip"]), + ), + extend=False, + ) + conditions = [ + # 如果集群尚未不部署Alwayson配置,且元数据记录是Alwayson同步类型,则走修复1:添加Alwayson可用组修复+数据同步修复 + Conditions( + act_object=self._create_always_on_fix_sub_flow( + sub_flow_context=sub_flow_context, + master=master, + rebuild_slave=rebuild_slave, + cluster=cluster, ), - ) - # 更新清理数据库列表 - sub_flow_context["clean_dbs"] = list( - set(get_dbs_for_drs(cluster_id=cluster.id, db_list=["*"], ignore_db_list=[])) - | set(get_restoring_dbs(rebuild_slave, cluster.bk_cloud_id)) - ) - sync_dbs = get_dbs_for_drs(cluster_id=cluster.id, db_list=["*"], ignore_db_list=[]) - - if len(sync_dbs) > 0: - # 在slave重新建立数据库级别主从关系 - sub_pipeline.add_sub_pipeline( - sub_flow=sync_dbs_for_cluster_sub_flow( - uid=self.data["uid"], - root_id=self.root_id, + express="==1", + ), + # 如果待修复的slave可用组异常,则走修复2:重建slave可用组修复 + 数据同步修复 + Conditions( + act_object=self._fix_always_on_status_sub_flow( + sub_flow_context=sub_flow_context, + master=master, + rebuild_slave=rebuild_slave, cluster=cluster, - sync_slaves=[Host(**info["slave_host"])], - sync_dbs=sync_dbs, - clean_dbs=sub_flow_context["clean_dbs"], - ) - ) + ), + express="==2", + ), + # 如果集群中部分数据库尚未建立同步,则走修复3:部分数据库同步修复 + Conditions( + act_object=self._fix_database_sync_sub_flow( + sub_flow_context=sub_flow_context, + rebuild_slave=rebuild_slave, + cluster=cluster, + ), + express="==3", + ), + ] + + sub_pipeline.add_conditional_subs( + source_act=source_act, + conditions=conditions, + name=_("判断待修复slave[{}]的状态".format(rebuild_slave.ip_port)), + conditions_param=SqlserverRebuildSlaveContext.conditions_var_name(), + ) # 先做克隆周边配置 sub_pipeline.add_sub_pipeline( @@ -242,7 +249,7 @@ def slave_rebuild_in_local_flow(self): ) main_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) - main_pipeline.run_pipeline(init_trans_data_class=SqlserverBackupIDContext()) + main_pipeline.run_pipeline(init_trans_data_class=SqlserverRebuildSlaveContext()) def slave_rebuild_in_new_slave_flow(self): """ @@ -667,3 +674,133 @@ def remote_slave_in_cluster( ), ) return sub_pipeline.build_sub_process(sub_name=_("移除可用组[{}]".format(cluster.immute_domain))) + + def _create_always_on_fix_sub_flow( + self, sub_flow_context: dict, master: StorageInstance, rebuild_slave: StorageInstance, cluster: Cluster + ): + """ + 创建整个集群可用组,修复slave的子流程 + """ + # 声明子流程 + sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context)) + sub_pipeline.add_sub_pipeline( + sub_flow=build_always_on_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + master_instance=SqlserverInstance( + host=master.machine.ip, + port=master.port, + bk_cloud_id=cluster.bk_cloud_id, + is_new=True, + ), + slave_instances=[ + SqlserverInstance( + host=rebuild_slave.machine.ip, + port=rebuild_slave.port, + bk_cloud_id=cluster.bk_cloud_id, + is_new=True, + ) + ], + cluster_name=cluster.name, + group_name=cluster.immute_domain, + is_use_sa=True, + ) + ) + + sub_pipeline.add_sub_pipeline( + sub_flow=sync_dbs_for_cluster_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=cluster, + sync_slaves=[ + Host( + ip=rebuild_slave.machine.ip, + bk_cloud_id=rebuild_slave.machine.bk_cloud_id, + bk_host_id=rebuild_slave.machine.bk_host_id, + ) + ], + sync_dbs=[], + clean_dbs=[], + is_recalc_sync_dbs=True, + is_recalc_clean_dbs=True, + ) + ) + + return sub_pipeline.build_sub_process(sub_name=_("集群[{}]添加可用组修复流程".format(cluster.name))) + + def _fix_always_on_status_sub_flow( + self, sub_flow_context: dict, master: StorageInstance, rebuild_slave: StorageInstance, cluster: Cluster + ): + """ + 重建slave的可用组场景,修复slave的子流程 + """ + # 声明子流程 + + sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context)) + sub_pipeline.add_act( + act_name=_("[{}]重建可用组".format(rebuild_slave.ip_port)), + act_component_code=SqlserverActuatorScriptComponent.code, + kwargs=asdict( + ExecActuatorKwargs( + exec_ips=[Host(ip=master.machine.ip, bk_cloud_id=cluster.bk_cloud_id)], + get_payload_func=SqlserverActPayload.get_build_always_on.__name__, + custom_params={ + "port": master.port, + "add_slaves": [{"host": rebuild_slave.machine.ip, "port": rebuild_slave.port}], + "group_name": get_group_name( + master_instance=master, bk_cloud_id=cluster.bk_cloud_id, is_check_group=True + ), + "is_first": False, + "is_use_sa": False, + }, + ) + ), + ) + + sub_pipeline.add_sub_pipeline( + sub_flow=sync_dbs_for_cluster_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=cluster, + sync_slaves=[ + Host( + ip=rebuild_slave.machine.ip, + bk_cloud_id=rebuild_slave.machine.bk_cloud_id, + bk_host_id=rebuild_slave.machine.bk_host_id, + ) + ], + sync_dbs=[], + clean_dbs=[], + is_recalc_sync_dbs=True, + is_recalc_clean_dbs=True, + ) + ) + + return sub_pipeline.build_sub_process(sub_name=_("slave[{}]重建可用组修复流程".format(rebuild_slave.ip_port))) + + def _fix_database_sync_sub_flow(self, sub_flow_context: dict, rebuild_slave: StorageInstance, cluster: Cluster): + """ + 部分数据库未建立同步场景,修复slave的子流程 + """ + # 声明子流程 + sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context)) + sub_pipeline.add_sub_pipeline( + sub_flow=sync_dbs_for_cluster_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=cluster, + sync_slaves=[ + Host( + ip=rebuild_slave.machine.ip, + bk_cloud_id=rebuild_slave.machine.bk_cloud_id, + bk_host_id=rebuild_slave.machine.bk_host_id, + ) + ], + sync_dbs=[], + clean_dbs=[], + is_recalc_sync_dbs=True, + is_recalc_clean_dbs=True, + ) + ) + + return sub_pipeline.build_sub_process(sub_name=_("slave[{}]同步数据修复流程".format(rebuild_slave.ip_port))) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_sql_execute.py b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_sql_execute.py index a9abfa79e8..3ea0b3ab79 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_sql_execute.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/sqlserver/sqlserver_sql_execute.py @@ -8,6 +8,7 @@ specific language governing permissions and limitations under the License. """ + import copy import logging.config from dataclasses import asdict diff --git a/dbm-ui/backend/flow/plugins/components/collections/common/empty_node.py b/dbm-ui/backend/flow/plugins/components/collections/common/empty_node.py new file mode 100644 index 0000000000..95011ce6d8 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/common/empty_node.py @@ -0,0 +1,28 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from pipeline.component_framework.component import Component + +from backend.flow.plugins.components.collections.common.base_service import BaseService + +logger = logging.getLogger("json") + + +class EmptyNodeService(BaseService): + def _execute(self, data, parent_data) -> bool: + return True + + +class EmptyNodeComponent(Component): + name = __name__ + code = "empty_node" + bound_service = EmptyNodeService diff --git a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/check_mssql_service.py b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/check_mssql_service.py new file mode 100644 index 0000000000..037fafb65b --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/check_mssql_service.py @@ -0,0 +1,35 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from pipeline.component_framework.component import Component + +from backend.flow.plugins.components.collections.sqlserver.exec_actuator_script import SqlserverActuatorScriptService + +logger = logging.getLogger("json") + + +class CheckSQLServerServiceService(SqlserverActuatorScriptService): + def _schedule(self, data, parent_data, callback_data=None) -> bool: + result = super()._schedule(data, parent_data) + if not result: + return False + # 处理判断变量 + trans_data = data.get_one_of_inputs("trans_data") + write_payload_var = data.get_one_of_inputs("write_payload_var") + data.outputs.is_registered = int(getattr(trans_data, write_payload_var)["is_registered"]) + return True + + +class CheckSQLServerServiceComponent(Component): + name = __name__ + code = "check_sqlserver_service" + bound_service = CheckSQLServerServiceService diff --git a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/check_slave_sync_status.py b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/check_slave_sync_status.py new file mode 100644 index 0000000000..a8cfee7d96 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/check_slave_sync_status.py @@ -0,0 +1,104 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from pipeline.component_framework.component import Component + +from backend.db_meta.enums import InstanceRole +from backend.db_meta.models import Cluster +from backend.db_meta.models.storage_set_dtl import SqlserverClusterSyncMode +from backend.flow.consts import SqlserverSyncMode +from backend.flow.plugins.components.collections.common.base_service import BaseService +from backend.flow.utils.sqlserver.sqlserver_db_function import ( + check_always_on_status, + exec_resume_sp, + get_dbs_for_drs, + get_group_name, + get_no_sync_dbs, + get_restoring_dbs, +) + + +class CheckSlaveSyncStatusService(BaseService): + """ + 判断带重建的slave处于什么状态,状态值都是用fix_number返回,不同fix_number代表不同修复流程: + 1: 可用组缺失 + 2: slave可用组状态异常 + 3: master部分库尚未建立同步 + 4: master所有库都建立同步,同步处于健康状态 + """ + + def _execute(self, data, parent_data) -> bool: + kwargs = data.get_one_of_inputs("kwargs") + trans_data = data.get_one_of_inputs("trans_data") + + # 获取集群的相关的信息 + cluster = Cluster.objects.get(id=kwargs["cluster_id"]) + fix_slave = cluster.storageinstance_set.get(machine__ip=kwargs["fix_slave_host"]) + master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) + cluster_sync_mode = SqlserverClusterSyncMode.objects.get(cluster_id=cluster.id).sync_mode + + # 首先确认集群同步类型 + if cluster_sync_mode == SqlserverSyncMode.ALWAYS_ON: + # 先确认可用组是否有配置 + sync_dbs = get_dbs_for_drs(cluster_id=cluster.id, db_list=["*"], ignore_db_list=[]) + clean_dbs = list(set(sync_dbs) | set(get_restoring_dbs(fix_slave, cluster.bk_cloud_id))) + if not get_group_name(master_instance=master, bk_cloud_id=cluster.bk_cloud_id, is_check_group=True): + # 如果可用组配置缺失,走建立可用组的流程 + self.log_info("group_name if null") + data.outputs.fix_number = 1 + trans_data.sync_dbs = sync_dbs + trans_data.clean_dbs = clean_dbs + data.outputs["trans_data"] = trans_data + return True + + elif not check_always_on_status(cluster, fix_slave): + # 如果可用组状态异常,走重建可用组流程 + self.log_info("always_on_status is abnormal") + data.outputs.fix_number = 2 + trans_data.sync_dbs = sync_dbs + trans_data.clean_dbs = clean_dbs + data.outputs["trans_data"] = trans_data + return True + + # 判断数据库同步状态 + if get_no_sync_dbs(cluster_id=cluster.id): + # 如果有数据库尚未同步,先修复同步,在判断同步是否正常 + exec_resume_sp( + slave_instances=[fix_slave], + master_host=master.machine.ip, + master_port=master.port, + bk_cloud_id=cluster.bk_cloud_id, + ) + self.log_info("exec exec_resume_sp finish, check the result...") + # 监测数据同步状态 + sync_dbs = get_no_sync_dbs(cluster_id=kwargs["cluster_id"]) + if sync_dbs: + # 表示修复失败 + self.log_warning("exec exec_resume_sp unsuccessfully") + trans_data.sync_dbs = sync_dbs + trans_data.clean_dbs = list(set(sync_dbs) | set(get_restoring_dbs(fix_slave, cluster.bk_cloud_id))) + data.outputs.fix_number = 3 + data.outputs["trans_data"] = trans_data + return True + else: + # 代表数据库重新建立成功 + self.log_info("exec exec_resume_sp successfully") + data.outputs.fix_number = 4 + return True + + self.log_info("no dbs fix sync") + data.outputs.fix_number = 4 + return True + + +class CheckSlaveSyncStatusComponent(Component): + name = __name__ + code = "sqlserver_check_rebuild_slave" + bound_service = CheckSlaveSyncStatusService diff --git a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/restore_for_do_dr.py b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/restore_for_do_dr.py index 3c36b6b5ff..2310b0c1e6 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/restore_for_do_dr.py +++ b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/restore_for_do_dr.py @@ -42,15 +42,21 @@ class RestoreForDoDrService(SqlserverActuatorScriptService): def _execute(self, data, parent_data) -> bool: kwargs = data.get_one_of_inputs("kwargs") + global_data = data.get_one_of_inputs("global_data") trans_data = data.get_one_of_inputs("trans_data") restore_dbs = [] restore_infos = [] + if global_data.get("is_recalc_sync_dbs", False): + check_restore_dbs = trans_data.sync_dbs + else: + check_restore_dbs = kwargs["restore_dbs"] + backup_id = getattr(trans_data, get_backup_id_map[kwargs["restore_mode"]])["id"] if not backup_id: raise Exception(f"backup id is null: backup_id:{backup_id}") - for db_name in kwargs["restore_dbs"]: + for db_name in check_restore_dbs: self.log_info(f"checking db:[{db_name}]") backup_info = get_backup_path_files( diff --git a/dbm-ui/backend/flow/plugins/components/collections/sqlserver/update_window_gse_config.py b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/update_window_gse_config.py new file mode 100644 index 0000000000..c9fa93249b --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/sqlserver/update_window_gse_config.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.utils.translation import ugettext as _ +from pipeline.component_framework.component import Component + +from backend import env +from backend.components import CCApi +from backend.components.sops.client import BkSopsApi +from backend.flow.consts import WINDOW_SYSTEM_JOB_USER +from backend.flow.plugins.components.collections.common.base_service import BkSopsService + + +class UpdateWindowGseConfig(BkSopsService): + def __get_bk_biz_id_for_ip(self, ip: str, bk_cloud_id: int): + + # 先通过ip获取对应的bk_host_id + res = CCApi.list_hosts_without_biz( + { + "fields": ["bk_host_id"], + "host_property_filter": { + "condition": "AND", + "rules": [ + {"field": "bk_host_innerip", "operator": "in", "value": [ip]}, + {"field": "bk_cloud_id", "operator": "equal", "value": bk_cloud_id}, + ], + }, + }, + use_admin=True, + ) + bk_host_id = res["info"][0]["bk_host_id"] + self.log_info(f"the bk_host_id of machine [{ip}] is {bk_host_id}") + + # 再通过bk_host_id获取 + res = CCApi.find_host_biz_relations({"bk_host_id": [bk_host_id]}) + return res[0]["bk_biz_id"] + + def _execute(self, data, parent_data) -> bool: + kwargs = data.get_one_of_inputs("kwargs") + ips = kwargs["ips"] + # 获取业务id + if kwargs.get("bk_biz_id"): + bk_biz_id = kwargs["bk_biz_id"] + else: + # 否则认为你这一批的机器都是来自于同一个业务上,则已其中一个ip获取的业务id为准 + bk_biz_id = self.__get_bk_biz_id_for_ip(ip=ips[0], bk_cloud_id=int(kwargs["bk_cloud_id"])) + + param = { + "template_id": env.UPDATE_WINDOW_GSE_CONFIG, + "bk_biz_id": bk_biz_id, + "template_source": "common", + "name": _("更新window的gse配置信息"), + "flow_type": "common", + "constants": { + "${biz_cc_id}": bk_biz_id, + "${bk_biz_id}": bk_biz_id, + "${job_ip_list}": "\n".join(ips), + "${job_account}": WINDOW_SYSTEM_JOB_USER, + }, + } + rpdata = BkSopsApi.create_task(param) + task_id = rpdata["task_id"] + # start task + self.log_info(f"job url:{env.BK_SOPS_URL}/taskflow/execute/{env.BK_SOPS_PROJECT_ID}/?instance_id={task_id}") + param = {"bk_biz_id": bk_biz_id, "task_id": task_id} + BkSopsApi.start_task(param) + data.outputs.task_id = task_id + data.inputs.kwargs["bk_biz_id"] = bk_biz_id + return True + + +class UpdateWindowGseConfigComponent(Component): + name = _("更新window的gse配置信息") + code = "update_window_gse_config_for_sop" + bound_service = UpdateWindowGseConfig diff --git a/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_dataclass.py b/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_dataclass.py index c6349aa3d3..851aaecca4 100644 --- a/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_dataclass.py +++ b/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_dataclass.py @@ -266,6 +266,38 @@ def log_backup_id_var_name() -> str: return "log_backup_id" +@dataclass() +class SqlserverRebuildSlaveContext: + """ + 定义重建slave的可交互上下文dataclass类 + """ + + sync_dbs: list = field(default_factory=list) + clean_dbs: list = field(default_factory=list) + full_backup_id: dict = field(default_factory=dict) + log_backup_id: dict = field(default_factory=dict) + + @staticmethod + def sync_dbs_var_name() -> str: + return "sync_dbs" + + @staticmethod + def clean_dbs_var_name() -> str: + return "clean_dbs" + + @staticmethod + def full_backup_id_var_name() -> str: + return "full_backup_id" + + @staticmethod + def log_backup_id_var_name() -> str: + return "log_backup_id" + + @staticmethod + def conditions_var_name() -> str: + return "fix_number" + + @dataclass() class CheckDBExistKwargs: """ @@ -277,3 +309,23 @@ class CheckDBExistKwargs: cluster_id: str check_dbs: list = field(default_factory=list) + + +@dataclass +class UpdateWindowGseConfigKwargs: + """ + 定义变更gse参数的配置私有变量结构体 + """ + + bk_cloud_id: int + ips: list + + +@dataclass +class CheckSlaveSyncStatusKwargs: + """ + 定义sqlserver_check_rebuild_slave私有变量结构体 + """ + + cluster_id: int + fix_slave_host: list diff --git a/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_payload.py b/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_payload.py index 6be84d4a86..d2e62aa0cb 100644 --- a/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_payload.py +++ b/dbm-ui/backend/flow/utils/sqlserver/sqlserver_act_payload.py @@ -38,6 +38,17 @@ def system_init_payload(self, **kwargs) -> dict: "payload": {**self.get_init_system_account(), **payload}, } + @staticmethod + def check_mssql_service_payload(self, **kwargs) -> dict: + """ + 测试实例是否注册存在的payload + """ + return { + "db_type": DBActuatorTypeEnum.Sqlserver_check.value, + "action": SqlserverActuatorActionEnum.MssqlServiceCheck.value, + "payload": {}, + } + def get_install_sqlserver_payload(self, **kwargs) -> dict: """ 拼接安装sqlserver的payload参数, 分别兼容集群申请、集群实例重建、集群实例添加单据的获取方式 @@ -159,6 +170,11 @@ def get_backup_dbs_payload(self, **kwargs) -> dict: """ 执行数据库备份的payload """ + if self.global_data.get("is_recalc_sync_dbs", False): + backup_dbs = kwargs["trans_data"]["sync_dbs"] + else: + backup_dbs = self.global_data["backup_dbs"] + return { "db_type": DBActuatorTypeEnum.Sqlserver.value, "action": SqlserverActuatorActionEnum.BackupDBS.value, @@ -167,7 +183,7 @@ def get_backup_dbs_payload(self, **kwargs) -> dict: "extend": { "host": kwargs["ips"][0]["ip"], "port": kwargs["custom_params"]["port"], - "backup_dbs": self.global_data["backup_dbs"], + "backup_dbs": backup_dbs, "backup_type": kwargs["custom_params"]["backup_type"], "job_id": self.global_data["job_id"], "file_tag": kwargs["custom_params"]["file_tag"], @@ -198,8 +214,12 @@ def get_rename_dbs_payload(self, **kwargs) -> dict: def get_clean_dbs_payload(self, **kwargs) -> dict: """ - 执行数据库重命名的payload + 执行数据库清档的payload """ + if self.global_data.get("is_recalc_clean_dbs", False): + clean_dbs = kwargs["trans_data"]["clean_dbs"] + else: + clean_dbs = self.global_data["clean_dbs"] return { "db_type": DBActuatorTypeEnum.Sqlserver.value, "action": SqlserverActuatorActionEnum.CleanDBS.value, @@ -208,7 +228,7 @@ def get_clean_dbs_payload(self, **kwargs) -> dict: "extend": { "host": kwargs["ips"][0]["ip"], "port": self.global_data["port"], - "clean_dbs": self.global_data["clean_dbs"], + "clean_dbs": clean_dbs, "sync_mode": self.global_data["sync_mode"], "clean_mode": self.global_data["clean_mode"], "slaves": self.global_data["slaves"], @@ -370,6 +390,11 @@ def get_build_database_mirroring(self, **kwargs) -> dict: """ 建立数据库级别镜像关系的payload """ + if self.global_data.get("is_recalc_sync_dbs", False): + sync_dbs = kwargs["trans_data"]["sync_dbs"] + else: + sync_dbs = kwargs["custom_params"]["dbs"] + return { "db_type": DBActuatorTypeEnum.Sqlserver.value, "action": SqlserverActuatorActionEnum.BuildDBMirroring.value, @@ -380,7 +405,7 @@ def get_build_database_mirroring(self, **kwargs) -> dict: "port": self.global_data["port"], "dr_host": kwargs["custom_params"]["dr_host"], "dr_port": kwargs["custom_params"]["dr_port"], - "dbs": kwargs["custom_params"]["dbs"], + "dbs": sync_dbs, }, }, } @@ -389,6 +414,11 @@ def get_build_add_dbs_in_always_on(self, **kwargs) -> dict: """ 建立数据库加入always_on可用组的payload """ + if self.global_data.get("is_recalc_sync_dbs", False): + sync_dbs = kwargs["trans_data"]["sync_dbs"] + else: + sync_dbs = kwargs["custom_params"]["dbs"] + return { "db_type": DBActuatorTypeEnum.Sqlserver.value, "action": SqlserverActuatorActionEnum.AddDBSInAlwaysOn.value, @@ -398,7 +428,7 @@ def get_build_add_dbs_in_always_on(self, **kwargs) -> dict: "host": kwargs["ips"][0]["ip"], "port": self.global_data["port"], "add_slaves": kwargs["custom_params"]["add_slaves"], - "dbs": kwargs["custom_params"]["dbs"], + "dbs": sync_dbs, }, }, } diff --git a/dbm-ui/backend/flow/utils/sqlserver/sqlserver_db_function.py b/dbm-ui/backend/flow/utils/sqlserver/sqlserver_db_function.py index 501a6db329..b56e3ff79e 100644 --- a/dbm-ui/backend/flow/utils/sqlserver/sqlserver_db_function.py +++ b/dbm-ui/backend/flow/utils/sqlserver/sqlserver_db_function.py @@ -8,7 +8,7 @@ specific language governing permissions and limitations under the License. """ import copy -import logging.config +import logging import re import secrets from collections import defaultdict @@ -360,11 +360,12 @@ def exec_instance_app_login(cluster: Cluster, exec_type: SqlserverLoginExecMode, return True -def get_group_name(master_instance: StorageInstance, bk_cloud_id: int): +def get_group_name(master_instance: StorageInstance, bk_cloud_id: int, is_check_group: bool = False): """ 获取集群group_name名称 @param master_instance: master实例 @param bk_cloud_id: 云区域id + @param is_check_group 默认False,表示如果查询group_name为空则异常,反之True为返回空,不报错 """ ret = DRSApi.sqlserver_rpc( { @@ -378,7 +379,11 @@ def get_group_name(master_instance: StorageInstance, bk_cloud_id: int): raise Exception(f"[{master_instance.ip_port}] get_group_name failed: {ret[0]['error_msg']}") if len(ret[0]["cmd_results"][0]["table_data"]) == 0: + if is_check_group: + # 如果设置True则正常返回空字符串 + return "" raise Exception(f"[{master_instance.ip_port}] get_group_name is null") + return ret[0]["cmd_results"][0]["table_data"][0]["name"] @@ -880,3 +885,27 @@ def check_ha_config( f"[{slave_instance.ip_port}]-{check_tag} configuration is not equal to master[{master_instance.ip_port}]", ) return True, "" + + +def exec_resume_sp(slave_instances: List[StorageInstance], master_host: str, master_port: int, bk_cloud_id: int): + """ + 执行尝试修复数据同步状态 + @param slave_instances: 待修复从库列表 + @param master_host: 待连接的master_host + @param master_port: 待连接的master_port + @param bk_cloud_id: 云区域ID + """ + cmd = f"use {SQLSERVER_CUSTOM_SYS_DB}; exec DBO.Sys_AutoSwitch_Resume '{master_host}','{master_port}', null" + logger.info(cmd) + ret = DRSApi.sqlserver_rpc( + { + "bk_cloud_id": bk_cloud_id, + "addresses": [storage.ip_port for storage in slave_instances], + "cmds": [cmd], + "force": False, + } + ) + + if ret[0]["error_msg"]: + raise Exception(f"Sys_AutoSwitch_Resume exec failed: {ret[0]['error_msg']}") + return True