Skip to content

Commit

Permalink
fix(sqlserver): 优化slave重建的逻辑 TencentBlueKing#8306
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and iSecloud committed Dec 17, 2024
1 parent 639e93b commit 9760896
Show file tree
Hide file tree
Showing 17 changed files with 779 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at https://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package checkcmd

import (
"fmt"

"dbm-services/common/go-pubpkg/logger"
"dbm-services/sqlserver/db-tools/dbactuator/internal/subcmd"
"dbm-services/sqlserver/db-tools/dbactuator/pkg/components/check"
"dbm-services/sqlserver/db-tools/dbactuator/pkg/util"

"github.com/spf13/cobra"
)

// MssqlServiceAct sqlserver 检查实例连接情况
type MssqlServiceAct struct {
*subcmd.BaseOptions
BaseService check.MssqlServiceComp
}

// MssqlServiceCommand godoc
//
// @Summary sqlserver 检查实例连接情况
// @Description -
// @Tags sqlserver
// @Accept json
// @Param body body MssqlServiceComp true "short description"
func MssqlServiceCommand() *cobra.Command {
act := MssqlServiceAct{
BaseOptions: subcmd.GBaseOptions,
}
cmd := &cobra.Command{
Use: "MssqlServiceCheck",
Short: "检查机器注册Sqlserver进程情况",
Example: fmt.Sprintf(`dbactuator check MssqlServiceCheck %s `, subcmd.CmdBaseExampleStr),
Run: func(cmd *cobra.Command, args []string) {
util.CheckErr(act.Validate())
if act.RollBack {
return
}
util.CheckErr(act.Init())
util.CheckErr(act.Run())
},
}
return cmd
}

// Init 初始化
func (u *MssqlServiceAct) Init() (err error) {
logger.Info("MssqlServiceAct Init")
if err = u.Deserialize(&u.BaseService.Params); err != nil {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
u.BaseService.GeneralParam = subcmd.GeneralRuntimeParam

return nil
}

// Run 执行
func (u *MssqlServiceAct) Run() (err error) {
steps := subcmd.Steps{
{
FunName: "检查机器注册Sqlserver进程情况",
Func: u.BaseService.CheckMssqlService,
},
}

if err := steps.Run(); err != nil {
return err
}
logger.Info("check-inst-processlists successfully")
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func CheckCommand() *cobra.Command {
Commands: []*cobra.Command{
CheckAbnormalDBCommand(),
CheckInstProcessCommand(),
MssqlServiceCommand(),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at https://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package check

import (
"bk-dbconfig/pkg/core/logger"
"fmt"

"dbm-services/sqlserver/db-tools/dbactuator/pkg/components"
"dbm-services/sqlserver/db-tools/dbactuator/pkg/util/osutil"
"dbm-services/sqlserver/db-tools/dbactuator/pkg/util/sqlserver"
)

// MssqlServiceComp 检查db连接情况
type MssqlServiceComp struct {
GeneralParam *components.GeneralParam
Params *MssqlServiceParam
DB *sqlserver.DbWorker
}

// MssqlServiceParam 参数
type MssqlServiceParam struct {
Host string `json:"host" validate:"ip" ` // 本地hostip
}

// CheckMssqlService 检查机器注册Sqlserver进程情况
func (c *MssqlServiceComp) CheckMssqlService() error {
var checkresult string
ret, err := osutil.StandardPowerShellCommand(
"GET-SERVICE -NAME MSSQL* | WHERE-OBJECT {$_.NAME -NOTLIKE \"*#*\"}",
)
if err != nil {
return err
}
if ret != "" {
// 输出不为空则表示有部署进程
logger.Info("there is a mssql process has been registered [%s]", osutil.CleanExecOutput(ret))
checkresult = "1"
}
logger.Info("no mssql service registered")
checkresult = "0"
components.WrapperOutputString(fmt.Sprintf("{\"checkresult\": \"%s\"}", checkresult))
return nil

}
2 changes: 2 additions & 0 deletions dbm-ui/backend/env/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
SA_L5_AGENT_TEMPLATE_ID = get_type_env(key="SA_L5_AGENT_TEMPLATE_ID", _type=int)
# 标准运维项目 ID
BK_SOPS_PROJECT_ID = get_type_env(key="BK_SOPS_PROJECT_ID", _type=int, default=1)
# 标准运维更新window机器的模板ID
UPDATE_WINDOW_GSE_CONFIG = get_type_env(key="UPDATE_WINDOW_GSE_CONFIG", _type=int, default=1)

# Bamboo
ENABLE_CLEAN_EXPIRED_BAMBOO_TASK = get_type_env(key="ENABLE_CLEAN_EXPIRED_BAMBOO_TASK", _type=bool, default=False)
Expand Down
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ class SqlserverActuatorActionEnum(str, StructuredEnum):
ClearConfig = EnumField("ClearConfig", _("清理实例周边配置。目前支持清理job、linkserver"))
RemoteDr = EnumField("RemoteDr", _("将一些dr移除可用组"))
Init = EnumField("init", _("部署后需要初始化实例的步骤"))
MssqlServiceCheck = EnumField("MssqlServiceCheck", _("检测进程是否注册"))


class DorisActuatorActionEnum(str, StructuredEnum):
Expand Down
55 changes: 53 additions & 2 deletions dbm-ui/backend/flow/engine/bamboo/scene/common/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
"""
import copy
import logging
from typing import Any, Dict, Optional
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from bamboo_engine import api, builder
from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
Data,
EmptyEndEvent,
EmptyStartEvent,
NodeOutput,
ParallelGateway,
Params,
RewritableNodeOutput,
Expand All @@ -33,11 +36,18 @@
from backend.flow.models import FlowNode, FlowTree, StateType
from backend.flow.plugins.components.collections.common.create_random_job_user import AddTempUserForClusterComponent
from backend.flow.plugins.components.collections.common.drop_random_job_user import DropTempUserForClusterComponent
from backend.flow.plugins.components.collections.common.empty_node import EmptyNodeComponent
from backend.ticket.constants import TicketType

logger = logging.getLogger("json")


@dataclass
class Conditions:
act_object: Any
express: str


class Builder(object):
"""
构建bamboo流程的抽象类,解决开发人员在编排流程的学习成本,减少代码重复率
Expand Down Expand Up @@ -77,6 +87,9 @@ def __init__(self, root_id: str, data: Optional[Dict] = None, need_random_pass_c
# 定义流程数据上下文参数trans_data
self.rewritable_node_source_keys = []

# 定义条件网关的上下文参数
self.node_output_list = []

# 判断是否添加临时账号的流程逻辑
if self.need_random_pass_cluster_ids:
self.create_random_pass_act()
Expand Down Expand Up @@ -203,6 +216,35 @@ def add_parallel_sub_pipeline(self, sub_flow_list: list):
cg = ConvergeGateway()
self.pipe = self.pipe.extend(pg).connect(*sub_flow_list).to(pg).converge(cg)

def add_conditional_subs(self, source_act, conditions: List[Conditions], name: str, conditions_param: str):
"""
add_conditional_subs:给流程添加条件分支节点,控制执行节点或者子流程
@param source_act: 控制添加源节点
@param conditions: 表达式
@param name: 表达式名称
@param conditions_param: 表达式变量名称
"""
real_conditions = {}
connect_list = []
for index, info in enumerate(conditions):
real_conditions[index] = f"${{{conditions_param}}} {info.express}"
connect_list.append(info.act_object)

# 添你默认节点
connect_list.append(
self.add_act(act_name="default_node", act_component_code=EmptyNodeComponent.code, kwargs={}, extend=False)
)
real_conditions[len(connect_list) - 1] = "1==1"

cpg = ConditionalParallelGateway(
conditions=real_conditions, name=name, default_condition_outgoing=len(connect_list) - 1
)
cg = ConvergeGateway()
self.pipe = self.pipe.extend(source_act).extend(cpg).connect(*connect_list).to(cpg).converge(cg)

# 拼接有可能条件网关需要的上下文变量
self.node_output_list.append({"conditions_param": conditions_param, "source_act_id": source_act.id})

def run_pipeline(self, init_trans_data_class: Optional[Any] = None, is_drop_random_user: bool = True) -> bool:
"""
开始运行 pipeline
Expand All @@ -218,6 +260,11 @@ def run_pipeline(self, init_trans_data_class: Optional[Any] = None, is_drop_rand
self.global_data.inputs["${trans_data}"] = RewritableNode(
source_act=self.rewritable_node_source_keys, type=Var.SPLICE, value=init_trans_data_class
)
# 声明NodeOutput变量
for i in self.node_output_list:
self.global_data.inputs[f"${{{i['conditions_param']}}}"] = NodeOutput(
type=Var.SPLICE, source_act=i["source_act_id"], source_key=f"{i['conditions_param']}"
)
self.pipe.extend(self.end_act)
pipeline = builder.build_tree(self.start_act, id=self.root_id, data=self.global_data)
pipeline_copy = copy.deepcopy(pipeline)
Expand Down Expand Up @@ -272,7 +319,11 @@ def build_sub_process(self, sub_name) -> Optional[SubProcess]:
sub_data.inputs["${trans_data}"] = RewritableNode(
source_act=self.rewritable_node_source_keys, type=Var.SPLICE, value=None
)
# sub_data.inputs['${trans_data}'] = DataInput(type=Var.SPLICE, value='${trans_data}')
# 声明NodeOutput变量
for i in self.node_output_list:
sub_data.inputs[f"${{{i['conditions_param']}}}"] = NodeOutput(
type=Var.SPLICE, source_act=i["source_act_id"], source_key=f"{i['conditions_param']}"
)
sub_params = Params({"${trans_data}": Var(type=Var.SPLICE, value="${trans_data}")})
self.pipe.extend(self.end_act)
return SubProcess(start=self.start_act, data=sub_data, params=sub_params, name=sub_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
SqlserverDownloadBackupFileComponent,
)
from backend.flow.plugins.components.collections.sqlserver.trans_files import TransFileInWindowsComponent
from backend.flow.plugins.components.collections.sqlserver.update_window_gse_config import (
UpdateWindowGseConfigComponent,
)
from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs, InstallNodemanPluginKwargs
from backend.flow.utils.mysql.mysql_act_dataclass import InitCheckKwargs, UpdateDnsRecordKwargs
from backend.flow.utils.sqlserver.sqlserver_act_dataclass import (
Expand All @@ -65,6 +68,7 @@
P2PFileForWindowKwargs,
RestoreForDoDrKwargs,
SqlserverBackupIDContext,
UpdateWindowGseConfigKwargs,
)
from backend.flow.utils.sqlserver.sqlserver_act_payload import SqlserverActPayload
from backend.flow.utils.sqlserver.sqlserver_db_function import get_backup_path
Expand Down Expand Up @@ -137,6 +141,19 @@ def install_sqlserver_sub_flow(
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 更新window机器的gse配置信息
if env.UPDATE_WINDOW_GSE_CONFIG:
acts_list = []
for host in target_hosts:
acts_list.append(
{
"act_name": _("更新gse配置信息[{}]".format(host.ip)),
"act_component_code": UpdateWindowGseConfigComponent.code,
"kwargs": asdict(UpdateWindowGseConfigKwargs(ips=[host.ip], bk_cloud_id=bk_cloud_id)),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 安装蓝鲸插件
acts_list = []
for plugin_name in DEPENDENCIES_PLUGINS:
Expand Down Expand Up @@ -435,6 +452,8 @@ def sync_dbs_for_cluster_sub_flow(
sync_dbs: list,
clean_dbs: list = None,
sub_flow_name: str = _("建立数据库同步子流程"),
is_recalc_sync_dbs: bool = False,
is_recalc_clean_dbs: bool = False,
):
"""
数据库建立同步的子流程
Expand All @@ -445,6 +464,8 @@ def sync_dbs_for_cluster_sub_flow(
@param sync_dbs: 待同步的db列表
@param clean_dbs: 这次清理的db列表,默认为空,则用sync_dbs列表作为清理db
@param sub_flow_name: 子流程名称
@param is_recalc_sync_dbs: 控制在流程运行是否在传输上下文获取sync_dbs,适配于原地重建slave场景
@param is_recalc_clean_dbs: 控制在流程运行是否在传输上下文获取clean_dbs,适配于原地重建slave场景
"""
# 获取当前master实例信息
master_instance = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER)
Expand All @@ -463,7 +484,7 @@ def sync_dbs_for_cluster_sub_flow(
SqlserverSyncMode.ALWAYS_ON: SqlserverActPayload.get_build_add_dbs_in_always_on.__name__,
}
# 判断必要参数
if len(sync_slaves) == 0 or len(sync_dbs) == 0:
if len(sync_slaves) == 0 or (len(sync_dbs) == 0 and is_recalc_sync_dbs is False):
raise Exception("sync_slaves or sync_dbs is null, check")

# 做判断, cluster_sync_mode 如果是mirror,原则上不允许一主多从的架构, 所以判断传入的slave是否有多个
Expand All @@ -483,6 +504,8 @@ def sync_dbs_for_cluster_sub_flow(
"ignore_clean_tables": [],
"sync_mode": SqlserverSyncModeMaps[cluster_sync_mode],
"slaves": [],
"is_recalc_sync_dbs": is_recalc_sync_dbs, # 判断标志位待入到全局上下文,获取payload进行判断
"is_recalc_clean_dbs": is_recalc_clean_dbs, # 判断标志位待入到全局上下文,获取payload进行判断
}

# 声明子流程
Expand Down
Loading

0 comments on commit 9760896

Please sign in to comment.