Skip to content

Commit

Permalink
feat(dbm-services): 追加部署增加校验表的数据量 #6568
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Sep 2, 2024
1 parent e3dfa20 commit 3d78238
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at https://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package spiderctlcmd

import (
"fmt"

"github.com/spf13/cobra"

"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-tools/dbactuator/internal/subcmd"
"dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl"
"dbm-services/mysql/db-tools/dbactuator/pkg/util"
)

// CheckTdbctlWithSpiderSchemaAct check tdbctl with spider schema
type CheckTdbctlWithSpiderSchemaAct struct {
Service spiderctl.CheckTdbctlWithSpideSchemaComp
}

// NewChkTdbctlSpiderSchaCommand create new subcommand
func NewChkTdbctlSpiderSchaCommand() *cobra.Command {
act := CheckTdbctlWithSpiderSchemaAct{}
cmd := &cobra.Command{
Use: "check-tdbctl-with-spider-schema",
Short: "spider集群后端切换",
Example: fmt.Sprintf(`dbactuator spiderctl cluster-backend-switch %s %s`,
subcmd.CmdBaseExampleStr, subcmd.ToPrettyJson(act.Service.Example()),
),
Run: func(cmd *cobra.Command, args []string) {
util.CheckErr(act.Init())
util.CheckErr(act.Run())
},
}
return cmd
}

// Init prepare run env
func (c *CheckTdbctlWithSpiderSchemaAct) Init() (err error) {
if _, err = subcmd.Deserialize(&c.Service.Params); err != nil {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
c.Service.GeneralParam = subcmd.GeneralRuntimeParam
return nil
}

// Run Command Run
func (c *CheckTdbctlWithSpiderSchemaAct) Run() (err error) {
steps := subcmd.Steps{
{
FunName: "初始化",
Func: c.Service.Init,
},
{
FunName: "检查db表数量",
Func: c.Service.RunCheck,
},
}
if err = steps.Run(); err != nil {
return err
}
logger.Info("check tdbctl with spider schema successfully")
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewSpiderCtlCommand() *cobra.Command {
NewTruncateOnCtlCommand(),
NewCreateToDBViaCtlCommand(),
NewRenameDropFromViaCtlCommand(),
NewChkTdbctlSpiderSchaCommand(),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
package spiderctl

import (
"dbm-services/common/go-pubpkg/mysqlcomm"
"errors"
"fmt"

"dbm-services/common/go-pubpkg/logger"
"dbm-services/common/go-pubpkg/mysqlcomm"
"dbm-services/mysql/db-tools/dbactuator/pkg/components"
"dbm-services/mysql/db-tools/dbactuator/pkg/native"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"bufio"
"context"
"database/sql"
"dbm-services/common/go-pubpkg/mysqlcomm"
"fmt"
"os"
"path"
Expand All @@ -24,6 +23,7 @@ import (

"dbm-services/common/go-pubpkg/cmutil"
"dbm-services/common/go-pubpkg/logger"
"dbm-services/common/go-pubpkg/mysqlcomm"
"dbm-services/mysql/db-tools/dbactuator/pkg/components"
"dbm-services/mysql/db-tools/dbactuator/pkg/core/cst"
"dbm-services/mysql/db-tools/dbactuator/pkg/native"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at https://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package spiderctl

import (
"errors"
"fmt"
"os"
"path"
"path/filepath"
"regexp"

"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-tools/dbactuator/pkg/components"
"dbm-services/mysql/db-tools/dbactuator/pkg/components/computil"
"dbm-services/mysql/db-tools/dbactuator/pkg/native"
"dbm-services/mysql/db-tools/dbactuator/pkg/util"
)

// CheckTdbctlWithSpideSchemaComp 检查spider和中控Schema是否一致
type CheckTdbctlWithSpideSchemaComp struct {
GeneralParam *components.GeneralParam `json:"general"`
Params CheckTdbctlWithSpideSchemParam `json:"extend"`
checkTdbctlWithSpideSchemaRt
}

// CheckTdbctlWithSpideSchemParam 检查参数
type CheckTdbctlWithSpideSchemParam struct {
Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址
Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口
SpiderPort int `json:"spider_port" validate:"required,lt=65536,gte=3306"` // spider节点端口
}

// CheckTdbctlWithSpideSchemRt runtime
type checkTdbctlWithSpideSchemaRt struct {
checkDbs []string
spiderBaseDir string
tdbctlBaseDir string
}

// Example subcommand example input
func (c CheckTdbctlWithSpideSchemaComp) Example() interface{} {
return CheckTdbctlWithSpideSchemaComp{
Params: CheckTdbctlWithSpideSchemParam{
Host: "127.0.0.1",
Port: 3306,
SpiderPort: 3307,
},
}
}

// FrmReg table frm
var FrmReg *regexp.Regexp

// Init init runtime
func (c *CheckTdbctlWithSpideSchemaComp) Init() (err error) {
user := c.GeneralParam.RuntimeAccountParam.MonitorUser
pwd := c.GeneralParam.RuntimeAccountParam.MonitorPwd
FrmReg = regexp.MustCompile(`.*frm$`)
tdbctlConn, err := native.InsObject{
Host: c.Params.Host,
Port: c.Params.Port,
User: user,
Pwd: pwd,
}.Conn()
if err != nil {
logger.Error("connect to tdbctl failed, err: %s", err.Error())
return err
}
spiderConn, err := native.InsObject{
Host: c.Params.Host,
Port: c.Params.SpiderPort,
User: user,
Pwd: pwd,
}.Conn()
if err != nil {
logger.Error("connect to spider failed, err: %s", err.Error())
return err
}
defer tdbctlConn.Close()
defer spiderConn.Close()

c.tdbctlBaseDir, err = tdbctlConn.GetSingleGlobalVar("datadir")
if err != nil {
logger.Error("get tdbctl datadir failed, err: %s", err.Error())
return err
}
c.spiderBaseDir, err = spiderConn.GetSingleGlobalVar("datadir")
if err != nil {
logger.Error("get spider datadir failed, err: %s", err.Error())
return err
}
version, err := spiderConn.SelectVersion()
if err != nil {
logger.Error("获取version failed %s", err.Error())
return err
}
alldbs, err := spiderConn.ShowDatabases()
if err != nil {
logger.Error("show databases failed, err: %s", err.Error())
return err
}
c.checkDbs = util.FilterOutStringSlice(alldbs, computil.GetGcsSystemDatabases(version))
c.checkDbs = util.FilterOutStringSlice(c.checkDbs, []string{"db_infobase"})
return nil
}

// RunCheck 检查
func (c *CheckTdbctlWithSpideSchemaComp) RunCheck() (err error) {
spiderDbSchemaCountMap := make(map[string]int)
tdbctlDbSchemaCountMap := make(map[string]int)
var globalErrs []error
for _, db := range c.checkDbs {
dbdirSpider := path.Join(c.spiderBaseDir, db)
dbdirTdbctl := path.Join(c.tdbctlBaseDir, db)
logger.Info("scan path %s,%s", dbdirSpider, dbdirTdbctl)

map1, dbdirSpiderCount, err := countFilesInDir(dbdirSpider)
if err != nil {
logger.Error("count files in dir %s failed, err: %s", dbdirSpider, err.Error())
}
map2, dbdirTdbctlCount, err := countFilesInDir(dbdirTdbctl)
if err != nil {
logger.Error("count files in dir %s failed, err: %s", dbdirTdbctl, err.Error())
}
for tb := range map1 {
if _, exist := map2[tb]; !exist {
msg := fmt.Sprintf("%s frm文件在 tdbctl 中不存在,请确认\n", tb)
globalErrs = append(globalErrs, fmt.Errorf(msg))
logger.Error(msg)
}
}
if dbdirSpiderCount != dbdirTdbctlCount {
msg := fmt.Sprintf("【%s】库上的表数量不一致,【tdbctl】节点上表总数量:%d, 【spider】节点上表的总数量 count:%d\n", db, dbdirTdbctlCount,
dbdirSpiderCount)
globalErrs = append(globalErrs, fmt.Errorf(msg))
logger.Error(msg)
}
spiderDbSchemaCountMap[db] = dbdirSpiderCount
tdbctlDbSchemaCountMap[db] = dbdirTdbctlCount
}
// 输出结果
for _, db := range c.checkDbs {
logger.Info("db:【%s】, 【tdbctl】节点上表总数量:%d, 【spider】节点上表的总数量 count:%d", db, tdbctlDbSchemaCountMap[db],
spiderDbSchemaCountMap[db])
}
return errors.Join(globalErrs...)
}

// countFilesInDir 获取文件数量
func countFilesInDir(dirPath string) (map[string]struct{}, int, error) {
count := 0
fileMap := make(map[string]struct{})

err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if !info.IsDir() {
if FrmReg.MatchString(path) && info.Size() > 0 {
fileMap[filepath.Base(path)] = struct{}{}
count++
}
}

return nil
})

if err != nil {
return nil, 0, err
}
return fileMap, count, nil
}
1 change: 1 addition & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ class DBActuatorActionEnum(str, StructuredEnum):
Deploy = EnumField("deploy", _("deploy"))
AppendDeploy = EnumField("append-deploy", _("append-deploy"))
ImportSchemaToTdbctl = EnumField("import-schema-to-tdbctl", _("import-schema-to-tdbctl"))
CheckTdbctlWithSpiderSchema = EnumField("check-tdbctl-with-spider-schema", _("icheck-tdbctl-with-spider-schema"))
GetBackupFile = EnumField("find-local-backup", _("find-local-backup"))
RestoreSlave = EnumField("restore-dr", _("restore-dr"))
RecoverBinlog = EnumField("recover-binlog", _("recover-binlog"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,16 @@ def run(self):
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
# 校验spider节点和tdbctldb表的数量
exec_act_kwargs.cluster = {"ctl_port": ctl_port, "spider_port": leader_spider.port}
exec_act_kwargs.exec_ip = primary_ctl_ip
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_check_schema_payload.__name__
migrate_pipeline.add_act(
act_name=_("校验spider和tdbctl节点表的数量"),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)

# 最后刷新其他路由,这样做的目标的是安全起见
cluster_info["only_init_ctl"] = False
exec_act_kwargs.cluster = cluster_info
Expand Down
14 changes: 14 additions & 0 deletions dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,20 @@ def get_import_schema_to_tdbctl_payload(self, **kwargs):
},
}

def get_check_schema_payload(self, **kwargs):
return {
"db_type": DBActuatorTypeEnum.SpiderCtl.value,
"action": DBActuatorActionEnum.CheckTdbctlWithSpiderSchema.value,
"payload": {
"general": {"runtime_account": self.account},
"extend": {
"host": kwargs["ip"],
"port": self.cluster["ctl_port"],
"spider_port": self.cluster["spider_port"],
},
},
}

def get_install_spider_ctl_payload(self, **kwargs):
"""
拼接spider-ctl节点安装的payload, ctl是单机单实例, 所以代码兼容多实例传入
Expand Down

0 comments on commit 3d78238

Please sign in to comment.