From 0d7a01170be1bd87a03cf7c6bf6d092ae1d8038a Mon Sep 17 00:00:00 2001 From: yksitu <1297650644@qq.com> Date: Wed, 6 Sep 2023 11:20:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0TBinlogdumper?= =?UTF-8?q?=E7=9A=84=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5=E7=9A=84=E5=9C=BA?= =?UTF-8?q?=20#939?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit style style fix: 修复问题 --- .../backup_demand_for_tbinlogdumper.go | 88 ++++++++ .../internal/subcmd/tbinlogdumpercmd/cmd.go | 2 + .../dump_schema_for_tbinlogdumper.go | 104 +++++++++ .../mysql/backupdemand/backup_demand.go | 25 ++- .../components/tbinlogdumper/dump_schema.go | 199 ++++++++++++++++++ .../tbinlogdumper/install_tbinlogdumper.go | 3 +- .../tbinlogdumper/unintall_tbinlogdumper.go | 2 +- .../pkg/util/mysqlutil/mysqlclient_exec.go | 2 +- dbm-ui/backend/flow/consts.py | 1 + .../bamboo/scene/tbinlogdumper/add_nodes.py | 35 ++- .../tbinlogdumper/common/common_sub_flow.py | 197 ++++++++++++++++- .../collections/mysql/trans_flies.py | 9 +- .../collections/tbinlogdumper/__init__.py | 9 + .../collections/tbinlogdumper/dumper_data.py | 66 ++++++ .../tbinlogdumper/trans_backup_file.py | 39 ++++ .../flow/utils/base/payload_handler.py | 126 +++++++++++ .../flow/utils/mysql/mysql_act_dataclass.py | 2 + .../flow/utils/mysql/mysql_act_playload.py | 196 +---------------- .../utils/tbinlogdumper/context_dataclass.py | 14 ++ .../tbinlogdumper_act_payload.py | 196 +++++++++++++++++ 20 files changed, 1108 insertions(+), 207 deletions(-) create mode 100644 dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/backup_demand_for_tbinlogdumper.go create mode 100644 dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/dump_schema_for_tbinlogdumper.go create mode 100644 dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/dump_schema.go create mode 100644 dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/__init__.py create mode 100644 dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/dumper_data.py create mode 100644 dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/trans_backup_file.py create mode 100644 dbm-ui/backend/flow/utils/base/payload_handler.py create mode 100644 dbm-ui/backend/flow/utils/tbinlogdumper/tbinlogdumper_act_payload.py diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/backup_demand_for_tbinlogdumper.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/backup_demand_for_tbinlogdumper.go new file mode 100644 index 0000000000..0c7ce5a343 --- /dev/null +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/backup_demand_for_tbinlogdumper.go @@ -0,0 +1,88 @@ +// TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +// Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package tbinlogdumpercmd + +import ( + "fmt" + + "github.com/spf13/cobra" + + "dbm-services/bigdata/db-tools/dbactuator/pkg/util" + "dbm-services/common/go-pubpkg/logger" + "dbm-services/mysql/db-tools/dbactuator/internal/subcmd" + "dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/backupdemand" +) + +type BackupDemandAct struct { + *subcmd.BaseOptions + Payload backupdemand.Component +} + +func NewDumperBackupDemandCommand() *cobra.Command { + act := BackupDemandAct{BaseOptions: subcmd.GBaseOptions} + cmd := &cobra.Command{ + Use: "backup-demand", + Short: "备份请求", + Example: fmt.Sprintf( + `dbactuator tbinlogdumper backup-demand %s %s`, + subcmd.CmdBaseExampleStr, subcmd.ToPrettyJson(act.Payload.Example())), + Run: func(cmd *cobra.Command, args []string) { + util.CheckErr(act.Validate()) + util.CheckErr(act.Init()) + util.CheckErr(act.Run()) + }, + } + return cmd +} + +func (d *BackupDemandAct) Init() (err error) { + if err = d.BaseOptions.Validate(); err != nil { // @todo 应该在一开始就validate + return err + } + if err = d.Deserialize(&d.Payload.Params); err != nil { + logger.Error("DeserializeAndValidate err %s", err.Error()) + return err + } + logger.Warn("params %+v", d.Payload.Params) + + return +} + +func (d *BackupDemandAct) Validate() error { + return nil +} + +func (d *BackupDemandAct) Run() (err error) { + defer util.LoggerErrorStack(logger.Error, err) + steps := subcmd.Steps{ + { + FunName: "初始化", + Func: d.Payload.Init, + }, + { + FunName: "生成备份配置", + Func: d.Payload.GenerateBackupConfig, + }, + { + FunName: "执行备份", + Func: d.Payload.DoBackup, + }, + { + FunName: "返回报告", + Func: d.Payload.OutPutForTBinlogDumper, + }, + } + + if err = steps.Run(); err != nil { + return err + } + + logger.Info("backup demand success") + return nil +} diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/cmd.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/cmd.go index 03b437a0da..7e068afff0 100644 --- a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/cmd.go +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/cmd.go @@ -24,6 +24,8 @@ func NewTbinlogDumperCommand() *cobra.Command { Commands: []*cobra.Command{ NewDeployTbinlogDumperCommand(), NewUnInstallTbinlogDumperCommand(), + NewDumperBackupDemandCommand(), + NewDumpSchemaCommand(), }, }, } diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/dump_schema_for_tbinlogdumper.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/dump_schema_for_tbinlogdumper.go new file mode 100644 index 0000000000..c740875e87 --- /dev/null +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/tbinlogdumpercmd/dump_schema_for_tbinlogdumper.go @@ -0,0 +1,104 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tbinlogdumpercmd + +import ( + "fmt" + + "dbm-services/common/go-pubpkg/logger" + "dbm-services/mysql/db-tools/dbactuator/internal/subcmd" + "dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper" + "dbm-services/mysql/db-tools/dbactuator/pkg/util" + + "github.com/spf13/cobra" +) + +// DumpSchemaAct TODO +type DumpSchemaAct struct { + *subcmd.BaseOptions + Service tbinlogdumper.DumpSchemaComp +} + +// NewSenmanticDumpSchemaCommand godoc +// +// @Summary 备份表结构并导入 +// @Description 备份表结构并导入 +// @Tags tbinlogdumper +// @Accept json +// @Produce json +// @Param body body tbinlogdumper.DumpSchemaComp true "short description" +// @Router /tbinlogdumper/semantic-dumpschema [post] +func NewDumpSchemaCommand() *cobra.Command { + act := DumpSchemaAct{ + BaseOptions: subcmd.GBaseOptions, + } + cmd := &cobra.Command{ + Use: "dumpschema", + Short: "运行导出并导入表结构", + Example: fmt.Sprintf( + `dbactuator tbinlogdumper dumpschema %s %s`, + subcmd.CmdBaseExampleStr, subcmd.ToPrettyJson(act.Service.Example()), + ), + Run: func(cmd *cobra.Command, args []string) { + util.CheckErr(act.Validate()) + util.CheckErr(act.Init()) + util.CheckErr(act.Run()) + }, + } + return cmd +} + +// Validate TODO +func (d *DumpSchemaAct) Validate() (err error) { + return d.BaseOptions.Validate() +} + +// Init TODO +func (d *DumpSchemaAct) Init() (err error) { + if err = d.Deserialize(&d.Service.Params); err != nil { + logger.Error("DeserializeAndValidate failed, %v", err) + return err + } + d.Service.GeneralParam = subcmd.GeneralRuntimeParam + return nil +} + +// Run TODO +func (d *DumpSchemaAct) Run() (err error) { + steps := subcmd.Steps{ + { + FunName: "init", + Func: d.Service.Init, + }, + { + FunName: "precheck", + Func: d.Service.Precheck, + }, + { + FunName: "运行导出表结构", + Func: d.Service.DumpSchema, + }, + { + FunName: "修改表结构的存储引擎", + Func: d.Service.ModifyEngine, + }, + { + FunName: "导入表结构到TBinlogdumper", + Func: d.Service.LoadSchema, + }, + } + if err := steps.Run(); err != nil { + return err + } + + logger.Info("同步表结构到TBinlogdumper实例成功") + return nil +} diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/backupdemand/backup_demand.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/backupdemand/backup_demand.go index c2266c3e63..a61fe849ec 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/backupdemand/backup_demand.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/backupdemand/backup_demand.go @@ -56,8 +56,8 @@ type context struct { randString string resultReportPath string statusReportPath string - backupPort []int // 当在 spider master备份时, 会有 [25000, 26000] 两个端口 - + backupPort []int // 当在 spider master备份时, 会有 [25000, 26000] 两个端口 + backupDir string //只是兼容tbinlogdumper的备份日志输出,存储备份目录信息,没有任何处理逻辑 } type Report struct { @@ -152,6 +152,8 @@ func (c *Component) GenerateBackupConfig() error { logger.Error("mkdir %s failed: %s", backupConfig.Public.BackupDir, err.Error()) return err } + // 增加为tbinlogdumper做库表备份的日志输出,保存流程上下文 + c.backupDir = backupConfig.Public.BackupDir } backupConfigFile := ini.Empty() @@ -306,3 +308,22 @@ func (c *Component) Example() interface{} { }, } } + +// OutPutForTBinlogDumper 增加为tbinlogdumper做库表备份的日志输出,保存流程上下文 +func (c *Component) OutPutForTBinlogDumper() error { + ret := make(map[string]interface{}) + report, err := c.generateReport() + if err != nil { + return err + } + ret["report_result"] = report.Result + ret["report_status"] = report.Status + ret["backup_dir"] = c.backupDir + + err = components.PrintOutputCtx(ret) + if err != nil { + logger.Error("output backup report failed: %s", err.Error()) + return err + } + return nil +} diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/dump_schema.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/dump_schema.go new file mode 100644 index 0000000000..755c3b583a --- /dev/null +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/dump_schema.go @@ -0,0 +1,199 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tbinlogdumper + +import ( + "fmt" + "path" + "regexp" + + "dbm-services/common/go-pubpkg/logger" + "dbm-services/mysql/db-tools/dbactuator/internal/subcmd" + "dbm-services/mysql/db-tools/dbactuator/pkg/components" + "dbm-services/mysql/db-tools/dbactuator/pkg/components/computil" + "dbm-services/mysql/db-tools/dbactuator/pkg/core/cst" + "dbm-services/mysql/db-tools/dbactuator/pkg/native" + "dbm-services/mysql/db-tools/dbactuator/pkg/util" + "dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil" + "dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil" +) + +// DumpSchemaComp TODO +type DumpSchemaComp struct { + GeneralParam *components.GeneralParam `json:"general"` + Params DumpSchemaParam `json:"extend"` + DumpSchemaRunTimeCtx `json:"-"` +} + +// DumpSchemaParam TODO +type DumpSchemaParam struct { + Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址 + Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口 + TBinlogdumperPort int `json:"tbinlogdumper_port" validate:"required,lt=65536,gte=3306"` // 当前TBinlogdumperPort实例的端口 + CharSet string `json:"charset" validate:"required,checkCharset"` // 字符集参数 + +} + +// DumpSchemaRunTimeCtx TODO +type DumpSchemaRunTimeCtx struct { + dbs []string // 需要备份的表结构的数据库名称集合 + charset string // 当前实例的字符集 + dumpCmd string + BackupFileName string // 备份文件 + BackupDir string +} + +// Example godoc +func (c *DumpSchemaComp) Example() interface{} { + comp := DumpSchemaComp{ + Params: DumpSchemaParam{ + Host: "1.1.1.1", + Port: 3306, + TBinlogdumperPort: 27000, + CharSet: "default", + }, + } + return comp +} + +// Init init +// +// @receiver c +// @return err +func (c *DumpSchemaComp) Init() (err error) { + conn, err := native.InsObject{ + Host: c.Params.Host, + Port: c.Params.Port, + User: c.GeneralParam.RuntimeAccountParam.AdminUser, + Pwd: c.GeneralParam.RuntimeAccountParam.AdminPwd, + }.Conn() + if err != nil { + logger.Error("Connect %d failed:%s", c.Params.Port, err.Error()) + return err + } + alldbs, err := conn.ShowDatabases() + if err != nil { + logger.Error("show all databases failed:%s", err.Error()) + return err + } + + version, err := conn.SelectVersion() + if err != nil { + logger.Error("获取version failed %s", err.Error()) + return err + } + + finaldbs := []string{} + reg := regexp.MustCompile(`^bak_cbs`) + for _, db := range util.FilterOutStringSlice(alldbs, computil.GetGcsSystemDatabasesIgnoreTest(version)) { + if reg.MatchString(db) { + continue + } + finaldbs = append(finaldbs, db) + } + if len(finaldbs) == 0 { + return fmt.Errorf("变更实例排除系统库后,再也没有可以变更的库") + } + c.dbs = finaldbs + c.charset = c.Params.CharSet + if c.Params.CharSet == "default" { + if c.charset, err = conn.ShowServerCharset(); err != nil { + logger.Error("获取实例的字符集失败:%s", err.Error()) + return err + } + } + c.BackupDir = cst.DumperDefaultBakDir + c.BackupFileName = fmt.Sprintf("tbinlogdump_%d_schema_%s.sql", c.Params.TBinlogdumperPort, subcmd.GBaseOptions.NodeId) + return err +} + +// Precheck 预检查 +// +// @receiver c +// @return err +func (c *DumpSchemaComp) Precheck() (err error) { + c.dumpCmd = path.Join(cst.MysqldInstallPath, "bin", "mysqldump") + // to export the table structure from the central control + // you need to use the mysqldump that comes with the central control + + if !osutil.FileExist(c.dumpCmd) { + return fmt.Errorf("dumpCmd: %s文件不存在", c.dumpCmd) + } + if !osutil.FileExist(c.BackupDir) { + return fmt.Errorf("backupdir: %s不存在", c.BackupDir) + } + return +} + +// DumpSchema 运行备份表结构 +// +// @receiver c +// @return err +func (c *DumpSchemaComp) DumpSchema() (err error) { + dumper := &mysqlutil.MySQLDumperTogether{ + MySQLDumper: mysqlutil.MySQLDumper{ + DumpDir: c.BackupDir, + Ip: c.Params.Host, + Port: c.Params.Port, + DbBackupUser: c.GeneralParam.RuntimeAccountParam.AdminUser, + DbBackupPwd: c.GeneralParam.RuntimeAccountParam.AdminPwd, + DbNames: c.dbs, + DumpCmdFile: c.dumpCmd, + Charset: c.charset, + MySQLDumpOption: mysqlutil.MySQLDumpOption{ + NoData: true, + AddDropTable: true, + NeedUseDb: true, + DumpRoutine: true, + DumpTrigger: false, + }, + }, + OutputfileName: c.BackupFileName, + } + if err := dumper.Dump(); err != nil { + logger.Error("dump failed: ", err.Error()) + return err + } + return nil +} + +// ModifyEngine 修改备份文件中引擎 +func (c *DumpSchemaComp) ModifyEngine() (err error) { + EngineName := "REDIS" + ModifyCmd := fmt.Sprintf( + "sed -i 's/ENGINE=[^ ]*/ENGINE=%s/g' %s", EngineName, path.Join(c.BackupDir, c.BackupFileName), + ) + logger.Info("ModifyCmd cmd:%s", ModifyCmd) + output, err := osutil.ExecShellCommand(false, ModifyCmd) + if err != nil { + return fmt.Errorf("execte get an error:%s,%w", output, err) + } + return nil +} + +// LoadSchema 导入表结构 +func (c *DumpSchemaComp) LoadSchema() (err error) { + err = mysqlutil.ExecuteSqlAtLocal{ + IsForce: false, + Charset: c.charset, + NeedShowWarnings: false, + Host: c.Params.Host, + Port: c.Params.TBinlogdumperPort, + WorkDir: c.BackupDir, + User: c.GeneralParam.RuntimeAccountParam.AdminUser, + Password: c.GeneralParam.RuntimeAccountParam.AdminPwd, + }.ExcuteSqlByMySQLClientOne(c.BackupFileName, "test") + if err != nil { + logger.Error("执行%s文件失败", path.Join(c.BackupDir, c.BackupFileName)) + return err + } + return nil +} diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/install_tbinlogdumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/install_tbinlogdumper.go index ca55abb348..789a533ecc 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/install_tbinlogdumper.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/install_tbinlogdumper.go @@ -254,11 +254,12 @@ func (i *InstallTbinlogDumperComp) initInsReplaceConfigs() error { logger.Error("%s:%d generation serverId Failed %s", i.Params.Host, port, err.Error()) return err } + DumperServerId, _ := strconv.ParseUint(fmt.Sprintf("%d%d", serverId, port), 10, 64) i.RenderConfigs[port] = renderDumperConfigs{Mysqld{ Basedir: i.MysqlInstallDir, Datadir: insBaseDataDir, Logdir: insBaseLogDir, - ServerId: serverId, + ServerId: DumperServerId, Port: strconv.Itoa(port), CharacterSetServer: i.Params.CharSet, BindAddress: i.Params.Host, diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/unintall_tbinlogdumper.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/unintall_tbinlogdumper.go index 70d4920d60..700672624c 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/unintall_tbinlogdumper.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/tbinlogdumper/unintall_tbinlogdumper.go @@ -52,7 +52,7 @@ func (u *UnInstallTbinlogDumperComp) TbinlogDumperClearDir() error { ) ) if !cmutil.FileExists(dataBak) { - cmd := fmt.Sprintf("mkdir %s;", dataBak) + cmd := fmt.Sprintf("mkdir %s && chown -R mysql:mysql %s ;", dataBak, dataBak) output, err := osutil.ExecShellCommand(false, cmd) if err != nil { err = fmt.Errorf("execute [%s] get an error:%w,output:%s", cmd, err, output) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysqlclient_exec.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysqlclient_exec.go index fd35105a2e..eae3936bd2 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysqlclient_exec.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil/mysqlclient_exec.go @@ -65,7 +65,7 @@ func (e ExecuteSqlAtLocal) CreateLoadSQLCommand() (command string) { if util.StrIsEmpty(e.Socket) { return fmt.Sprintf( `%s %s --safe_updates=0 -u %s %s -h%s -P %d %s -vvv `, - mysqlclient, forceStr, e.User, passwd, e.Host, e.Port, e.Charset, + mysqlclient, forceStr, e.User, passwd, e.Host, e.Port, connCharset, ) } return fmt.Sprintf( diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index 639f550487..c373056282 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -336,6 +336,7 @@ class DBActuatorActionEnum(str, StructuredEnum): MySQLBackupDemand = EnumField("backup-demand", _("mysql备份请求")) TenDBClusterBackendSwitch = EnumField("cluster-backend-switch", _("TenDBCluster集群做后端切换")) TenDBClusterMigrateCutOver = EnumField("cluster-backend-migrate-cutover", _("TenDBCluster集群做后端的成对迁移")) + DumpSchema = EnumField("dumpschema", _("为TBinlogDumper实例导出导入源表结构")) class RedisActuatorActionEnum(str, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/add_nodes.py b/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/add_nodes.py index 728a6b025d..3a7e568924 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/add_nodes.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/add_nodes.py @@ -19,7 +19,11 @@ from backend.flow.consts import TBinlogDumperAddType from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_repl_by_manual_input_sub_flow -from backend.flow.engine.bamboo.scene.tbinlogdumper.common.common_sub_flow import add_tbinlogdumper_sub_flow +from backend.flow.engine.bamboo.scene.tbinlogdumper.common.common_sub_flow import ( + add_tbinlogdumper_sub_flow, + full_sync_sub_flow, + incr_sync_sub_flow, +) from backend.flow.engine.bamboo.scene.tbinlogdumper.common.exceptions import NormalTBinlogDumperFlowException from backend.flow.engine.bamboo.scene.tbinlogdumper.common.util import get_cluster, get_tbinlogdumper_install_port from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent @@ -53,7 +57,7 @@ def add_nodes(self): # 获取对应集群相关对象 cluster = get_cluster(cluster_id=int(info["cluster_id"]), bk_biz_id=int(self.data["bk_biz_id"])) - # 根据不同的添加类型,来确定TBinlogDumper数据同步的行为 + # 获取最新的master实例 master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) # 获取安装端口 @@ -81,20 +85,31 @@ def add_nodes(self): ) for add_conf in info["add_confs"]: + # 根据不同的添加类型,来确定TBinlogDumper数据同步的行为 if add_conf["add_type"] == TBinlogDumperAddType.INCR_SYNC.value: sub_pipeline.add_sub_pipeline( - build_repl_by_manual_input_sub_flow( - bk_cloud_id=cluster.bk_cloud_id, + sub_flow=incr_sync_sub_flow( + cluster=cluster, root_id=self.root_id, - parent_global_data=sub_flow_context, - master_ip=master.machine.ip, - slave_ip=master.machine.ip, - master_port=master.port, - slave_port=add_conf["port"], + uid=self.data["uid"], + add_tbinlogdumper_conf=add_conf, + created_by=self.data["created_by"], ) ) elif add_conf["add_type"] == TBinlogDumperAddType.FULL_SYNC.value: - pass # todo + sub_pipeline.add_sub_pipeline( + sub_flow=full_sync_sub_flow( + cluster=cluster, + root_id=self.root_id, + uid=self.data["uid"], + add_tbinlogdumper_conf=add_conf, + created_by=self.data["created_by"], + ) + ) + else: + raise NormalTBinlogDumperFlowException( + message=_("非法上架特性,请联系系统管理员:add_type:{}".format(add_conf["add_type"])) + ) # 写元数据 sub_pipeline.add_act( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/common/common_sub_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/common/common_sub_flow.py index c5fe159271..ddabe39277 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/common/common_sub_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/tbinlogdumper/common/common_sub_flow.py @@ -7,6 +7,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import uuid from dataclasses import asdict from django.utils.translation import ugettext as _ @@ -18,14 +19,18 @@ from backend.flow.consts import DBA_SYSTEM_USER from backend.flow.engine.bamboo.scene.common.builder import SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_repl_by_manual_input_sub_flow from backend.flow.engine.bamboo.scene.tbinlogdumper.common.exceptions import NormalTBinlogDumperFlowException from backend.flow.engine.bamboo.scene.tbinlogdumper.common.util import get_tbinlogdumper_charset from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent +from backend.flow.plugins.components.collections.tbinlogdumper.dumper_data import TBinlogDumperFullSyncDataComponent from backend.flow.plugins.components.collections.tbinlogdumper.stop_slave import TBinlogDumperStopSlaveComponent -from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs +from backend.flow.plugins.components.collections.tbinlogdumper.trans_backup_file import TBinlogDumperTransFileComponent +from backend.flow.utils.mysql.mysql_act_dataclass import DownloadMediaKwargs, ExecActuatorKwargs, P2PFileKwargs from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload -from backend.flow.utils.tbinlogdumper.context_dataclass import StopSlaveKwargs +from backend.flow.utils.tbinlogdumper.context_dataclass import StopSlaveKwargs, TBinlogDumperFullSyncDataKwargs +from backend.flow.utils.tbinlogdumper.tbinlogdumper_act_payload import TBinlogDumperActPayload """ 定义一些TBinlogDumper流程上可能会用到的子流程,以便于减少代码的重复率 @@ -61,6 +66,7 @@ def add_tbinlogdumper_sub_flow( "created_by": created_by, "charset": charset, } + # 声明子流程 sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) # 阶段1 并行分发安装文件 @@ -85,11 +91,11 @@ def add_tbinlogdumper_sub_flow( bk_cloud_id=cluster.bk_cloud_id, cluster_type=cluster.cluster_type, exec_ip=master.machine.ip, - get_mysql_payload_func=MysqlActPayload.install_tbinlogdumper_payload.__name__, + get_mysql_payload_func=TBinlogDumperActPayload.install_tbinlogdumper_payload.__name__, ) ), ) - + # 返回子流程 return sub_pipeline.build_sub_process(sub_name=_("安装TBinlogDumper实例flow")) @@ -146,7 +152,7 @@ def reduce_tbinlogdumper_sub_flow( ExecActuatorKwargs( bk_cloud_id=inst.bk_cloud_id, exec_ip=inst.ip, - get_mysql_payload_func=MysqlActPayload.uninstall_tbinlogdumper_payload.__name__, + get_mysql_payload_func=TBinlogDumperActPayload.uninstall_tbinlogdumper_payload.__name__, cluster={"listen_ports": [inst.listen_port]}, ) ), @@ -154,6 +160,7 @@ def reduce_tbinlogdumper_sub_flow( ) sub_pipeline.add_parallel_acts(acts_list=acts_list) + # 返回子流程 return sub_pipeline.build_sub_process(sub_name=_("集群[{}]卸载TBinlogDumper实例flow".format(cluster.name))) @@ -192,6 +199,7 @@ def switch_sub_flow( for inst in switch_instances: old_dumper = ExtraProcessInstance.objects.get(id=inst["reduce_id"]) + # 按照实例维度声明子流程 sub_sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) # 旧实例断开同步 @@ -231,7 +239,7 @@ def switch_sub_flow( ExecActuatorKwargs( bk_cloud_id=cluster.bk_cloud_id, exec_ip=master.machine.ip, - get_mysql_payload_func=MysqlActPayload.tbinlogdumper_sync_data_payload.__name__, + get_mysql_payload_func=TBinlogDumperActPayload.tbinlogdumper_sync_data_payload.__name__, cluster={ "master_ip": master.machine.ip, "master_port": master.port, @@ -247,6 +255,183 @@ def switch_sub_flow( sub_sub_pipeline.build_sub_process(sub_name=_("切换到新实例[{}:{}]".format(master.machine.ip, inst["port"]))) ) + # 在将实例子流程聚合到上层 sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_sub_pipelines) return sub_pipeline.build_sub_process(sub_name=_("集群[{}]切换TBinlogDumper".format(cluster.name))) + + +def incr_sync_sub_flow( + cluster: Cluster, + root_id: str, + uid: str, + add_tbinlogdumper_conf: dict, + created_by: str = "", +): + """ + 定义TBinlogDumper增量同步的子流程 + @param cluster: 操作的云区域id + @param root_id: flow流程的root_id + @param uid: 单据uid + @param add_tbinlogdumper_conf: 待添加TBinlogdumper的实例配置 + @param created_by: 单据发起者 + """ + # 先获取集群的最新的master对象 + master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) + + # 拼接子流程的全局只读参数 + parent_global_data = { + "uid": uid, + "bk_biz_id": cluster.bk_biz_id, + "created_by": created_by, + "cluster_id": cluster.id, + "add_tbinlogdumper_conf": add_tbinlogdumper_conf, + } + + # 声明子流程 + sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) + + # 阶段1 对新TBinlogDumper做全表结构导入 + sub_pipeline.add_act( + act_name=_("导入相关表结构"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict( + ExecActuatorKwargs( + bk_cloud_id=cluster.bk_cloud_id, + exec_ip=master.machine.ip, + get_mysql_payload_func=TBinlogDumperActPayload.tbinlogdumper_load_schema_payload.__name__, + run_as_system_user=DBA_SYSTEM_USER, + ) + ), + ) + + # 阶段2 对新TBinlogDumper跟数据源做数据同步 + sub_pipeline.add_sub_pipeline( + build_repl_by_manual_input_sub_flow( + bk_cloud_id=cluster.bk_cloud_id, + root_id=root_id, + parent_global_data=parent_global_data, + master_ip=master.machine.ip, + slave_ip=master.machine.ip, + master_port=master.port, + slave_port=add_tbinlogdumper_conf["port"], + ) + ) + # 返回子流程 + return sub_pipeline.build_sub_process( + sub_name=_("实例TBinlogDumper[{}:{}]做增量同步".format(master.machine.ip, add_tbinlogdumper_conf["port"])) + ) + + +def full_sync_sub_flow( + cluster: Cluster, + root_id: str, + uid: str, + add_tbinlogdumper_conf: dict, + created_by: str = "", +): + """ + 定义TBinlogDumper全量同步的子流程 + @param cluster: 操作的云区域id + @param root_id: flow流程的root_id + @param uid: 单据uid + @param add_tbinlogdumper_conf: 待添加TBinlogdumper的实例配置 + @param created_by: 单据发起者 + """ + # 先获取集群的最新的master、backup对象 + master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) + backup = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_SLAVE, is_stand_by=True) + + # 拼接子流程的全局只读参数 + parent_global_data = { + "uid": uid, + "bk_biz_id": cluster.bk_biz_id, + "created_by": created_by, + "cluster_id": cluster.id, + "add_tbinlogdumper_conf": add_tbinlogdumper_conf, + "backup_id": uuid.uuid1(), + } + + # 声明子流程 + sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) + + # 阶段1 对新TBinlogDumper做全表结构导入 + sub_pipeline.add_act( + act_name=_("导入相关表结构"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict( + ExecActuatorKwargs( + bk_cloud_id=cluster.bk_cloud_id, + exec_ip=master.machine.ip, + get_mysql_payload_func=TBinlogDumperActPayload.tbinlogdumper_load_schema_payload.__name__, + run_as_system_user=DBA_SYSTEM_USER, + ) + ), + ) + + # 阶段2 添加同步账号 + sub_pipeline.add_act( + act_name=_("新增repl帐户"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict( + ExecActuatorKwargs( + bk_cloud_id=cluster.bk_cloud_id, + exec_ip=master.machine.ip, + get_mysql_payload_func=MysqlActPayload.get_grant_mysql_repl_user_payload.__name__, + cluster={"new_slave_ip": master.machine.ip, "mysql_port": master.port}, + run_as_system_user=DBA_SYSTEM_USER, + ) + ), + write_payload_var="master_ip_sync_info", + ) + + # 阶段3 根据TBinlogDumper的同步数据配置,在从节点触发一次逻辑备份请求 + sub_pipeline.add_act( + act_name=_("在slave[{}:{}]备份数据".format(backup.machine.ip, backup.port)), + act_component_code=TBinlogDumperFullSyncDataComponent.code, + kwargs=asdict( + TBinlogDumperFullSyncDataKwargs( + bk_cloud_id=cluster.bk_cloud_id, + backup_ip=backup.machine.ip, + backup_port=backup.port, + backup_role=backup.instance_role, + module_id=add_tbinlogdumper_conf["module_id"], + ) + ), + write_payload_var="backup_info", + ) + + # 阶段4 备份文件传输到TBinlogDumper机器,做导入数据准备 + sub_pipeline.add_act( + act_name=_("传输备份文件到TBinlogDumper[{}]".format(master.machine.ip)), + act_component_code=TBinlogDumperTransFileComponent.code, + kwargs=asdict( + P2PFileKwargs( + bk_cloud_id=cluster.bk_cloud_id, + file_list=[], + file_target_path="", + source_ip_list=[backup.machine.ip], + exec_ip=master.machine.ip, + run_as_system_user=DBA_SYSTEM_USER, + ) + ), + ) + + # 阶段5 发起导入备份数据,同步与数据源建立同步 + sub_pipeline.add_act( + act_name=_("导入备份数据"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict( + ExecActuatorKwargs( + bk_cloud_id=cluster.bk_cloud_id, + exec_ip=master.machine.ip, + get_mysql_payload_func=TBinlogDumperActPayload.tbinlogdumper_restore_payload.__name__, + run_as_system_user=DBA_SYSTEM_USER, + ) + ), + ) + + # 返回子流程 + return sub_pipeline.build_sub_process( + sub_name=_("实例TBinlogDumper[{}:{}]做全量同步".format(master.machine.ip, add_tbinlogdumper_conf["port"])) + ) diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/trans_flies.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/trans_flies.py index f28b051454..a846b35521 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mysql/trans_flies.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/trans_flies.py @@ -17,7 +17,7 @@ from backend import env from backend.components import JobApi from backend.core import consts -from backend.flow.consts import MediumFileTypeEnum +from backend.flow.consts import DBA_ROOT_USER, MediumFileTypeEnum from backend.flow.models import FlowNode from backend.flow.plugins.components.collections.common.base_service import BkJobService @@ -91,6 +91,13 @@ def _execute(self, data, parent_data) -> bool: payload["file_source_list"].append(file_source) payload["target_server"]["ip_list"] = target_ip_info + # 选择什么用户来传输文件 + if kwargs.get("run_as_system_user"): + payload["account_alias"] = kwargs["run_as_system_user"] + else: + # 现在默认使用root账号来执行 + payload["account_alias"] = DBA_ROOT_USER + if kwargs.get("file_target_path"): kwargs["file_target_path"] = str(kwargs["file_target_path"]).strip() if kwargs["file_target_path"] is not None and kwargs["file_target_path"] != "": diff --git a/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/__init__.py b/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/__init__.py new file mode 100644 index 0000000000..28b2474aa1 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/__init__.py @@ -0,0 +1,9 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/dumper_data.py b/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/dumper_data.py new file mode 100644 index 0000000000..fd1eaabdc2 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/dumper_data.py @@ -0,0 +1,66 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import uuid + +from pipeline.component_framework.component import Component + +from backend.flow.consts import DBA_SYSTEM_USER +from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptService +from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.tbinlogdumper.tbinlogdumper_act_payload import TBinlogDumperActPayload + + +class TBinlogDumperFullSyncDataService(ExecuteDBActuatorScriptService): + """ + 针对TBinlogdumper的场景,针对库表做逻辑备份 + """ + + def _execute(self, data, parent_data): + + kwargs = data.get_one_of_inputs("kwargs") + global_data = data.get_one_of_inputs("global_data") + + backup_ip = kwargs["backup_ip"] + backup_port = kwargs["backup_port"] + backup_role = kwargs["backup_role"] + + # 获取需要备份的库表信息,根据TBinlogdumper设置同步信息 + dumper_conf = TBinlogDumperActPayload.get_tbinlogdumper_config( + module_id=kwargs["module_id"], bk_biz_id=global_data["bk_biz_id"] + )["mysqld"] + + # 拼装备份你库表正则表达式, 目前只考虑replicate_do_table 和 replicate_wild_do_table的 过滤同步场景 + backup_regex = "" + if dumper_conf.get("replicate_do_table"): + backup_regex = dumper_conf["replicate_do_table"].replace(",", "|") + if dumper_conf.get("replicate_wild_do_table"): + tmp = dumper_conf["replicate_wild_do_table"].replace("%", "*") + backup_regex += "|" + backup_regex += tmp.replace(",", "|") + + # 下发库表备份指令 + data.get_one_of_inputs("global_data")["port"] = backup_port + data.get_one_of_inputs("global_data")["role"] = backup_role + + data.get_one_of_inputs("global_data")["db_table_filter_regex"] = backup_regex + + data.get_one_of_inputs("kwargs")["bk_cloud_id"] = kwargs["bk_cloud_id"] + data.get_one_of_inputs("kwargs")[ + "get_mysql_payload_func" + ] = TBinlogDumperActPayload.tbinlogdumper_backup_demand_payload.__name__ + data.get_one_of_inputs("kwargs")["exec_ip"] = backup_ip + data.get_one_of_inputs("kwargs")["run_as_system_user"] = DBA_SYSTEM_USER + return super()._execute(data, parent_data) + + +class TBinlogDumperFullSyncDataComponent(Component): + name = __name__ + code = "tbinlogdumper_full_sync_data" + bound_service = TBinlogDumperFullSyncDataService diff --git a/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/trans_backup_file.py b/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/trans_backup_file.py new file mode 100644 index 0000000000..668f9392e8 --- /dev/null +++ b/dbm-ui/backend/flow/plugins/components/collections/tbinlogdumper/trans_backup_file.py @@ -0,0 +1,39 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import json +import logging + +from django.utils.translation import ugettext as _ +from pipeline.component_framework.component import Component + +from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileService +from backend.flow.utils.mysql.common.compare_time import compare_time +from backend.ticket.constants import TicketType + +logger = logging.getLogger("flow") + + +class TBinlogDumperTransFileService(TransFileService): + def _execute(self, data, parent_data) -> bool: + """ + 执行传输文件的原子任务, 针对tbinlogdumper的全量同步 + """ + trans_data = data.get_one_of_inputs("trans_data") + data.get_one_of_inputs("kwargs")["file_list"] = [f"{trans_data.backup_info['backup_dir']}/*"] + data.get_one_of_inputs("kwargs")["file_target_path"] = trans_data.backup_info["backup_dir"] + + return super()._execute(data, parent_data) + + +class TBinlogDumperTransFileComponent(Component): + name = __name__ + code = "tbinlogdumper_trans_file" + bound_service = TBinlogDumperTransFileService diff --git a/dbm-ui/backend/flow/utils/base/payload_handler.py b/dbm-ui/backend/flow/utils/base/payload_handler.py new file mode 100644 index 0000000000..9b38f4f271 --- /dev/null +++ b/dbm-ui/backend/flow/utils/base/payload_handler.py @@ -0,0 +1,126 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging +import re +from typing import Any + +from backend import env +from backend.components import DBConfigApi +from backend.components.dbconfig.constants import FormatType, LevelName +from backend.constants import IP_RE_PATTERN +from backend.core.encrypt.constants import RSAConfigType +from backend.core.encrypt.handlers import RSAHandler +from backend.db_proxy.constants import ExtensionType +from backend.db_proxy.models import DBExtension +from backend.flow.consts import ConfigTypeEnum, NameSpaceEnum +from backend.ticket.constants import TicketType + +apply_list = [TicketType.MYSQL_SINGLE_APPLY.value, TicketType.MYSQL_HA_APPLY.value] + +logger = logging.getLogger("flow") + + +class PayloadHandler(object): + def __init__(self, bk_cloud_id: int, ticket_data: dict, cluster: dict, cluster_type: str = None): + """ + @param bk_cloud_id 操作的云区域 + @param ticket_data 单据信息 + @param cluster 需要操作的集群信息 + @param cluster_type 表示操作的集群类型,会决定到db_config获取配置的空间 + """ + self.init_mysql_config = {} + self.bk_cloud_id = bk_cloud_id + self.ticket_data = ticket_data + self.cluster = cluster + self.cluster_type = cluster_type + self.mysql_pkg = None + self.proxy_pkg = None + self.checksum_pkg = None + self.mysql_crond_pkg = None + self.mysql_monitor_pkg = None + self.account = self.get_mysql_account() + + # todo 后面可能优化这个问题 + if self.ticket_data.get("module"): + self.db_module_id = self.ticket_data["module"] + elif self.cluster and self.cluster.get("db_module_id"): + self.db_module_id = self.cluster["db_module_id"] + else: + self.db_module_id = 0 + + @staticmethod + def get_mysql_account() -> Any: + """ + 获取mysql实例内置帐户密码 + """ + data = DBConfigApi.query_conf_item( + { + "bk_biz_id": "0", + "level_name": LevelName.PLAT, + "level_value": "0", + "conf_file": "mysql#user", + "conf_type": ConfigTypeEnum.InitUser, + "namespace": NameSpaceEnum.TenDB.value, + "format": FormatType.MAP, + } + ) + return data["content"] + + @staticmethod + def __get_super_account_bypass(): + """ + 旁路逻辑:获取环境变量中的access_hosts, 用户名和密码 + """ + access_hosts = env.TEST_ACCESS_HOSTS or re.compile(IP_RE_PATTERN).findall(env.DRS_APIGW_DOMAIN) + drs_account_data = { + "access_hosts": access_hosts, + "user": env.DRS_USERNAME, + "pwd": env.DRS_PASSWORD, + } + + access_hosts = env.TEST_ACCESS_HOSTS or re.compile(IP_RE_PATTERN).findall(env.DBHA_APIGW_DOMAIN_LIST) + dbha_account_data = { + "access_hosts": access_hosts, + "user": env.DBHA_USERNAME, + "pwd": env.DBHA_PASSWORD, + } + + return drs_account_data, dbha_account_data + + def get_super_account(self): + """ + 获取mysql机器系统管理账号信息 + """ + + if env.DRS_USERNAME and env.DBHA_USERNAME: + return self.__get_super_account_bypass() + + rsa = RSAHandler.get_or_generate_rsa_in_db(RSAConfigType.get_rsa_cloud_name(self.bk_cloud_id)) + + drs = DBExtension.get_latest_extension(bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DRS) + drs_account_data = { + "access_hosts": DBExtension.get_extension_access_hosts( + bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DRS + ), + "pwd": RSAHandler.decrypt_password(rsa.rsa_private_key.content, drs.details["pwd"]), + "user": RSAHandler.decrypt_password(rsa.rsa_private_key.content, drs.details["user"]), + } + + dbha = DBExtension.get_latest_extension(bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DBHA) + dbha_account_data = { + "access_hosts": DBExtension.get_extension_access_hosts( + bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DBHA + ), + "pwd": RSAHandler.decrypt_password(rsa.rsa_private_key.content, dbha.details["pwd"]), + "user": RSAHandler.decrypt_password(rsa.rsa_private_key.content, dbha.details["user"]), + } + + return drs_account_data, dbha_account_data diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py index 6a02a22f00..1c59e4a296 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_dataclass.py @@ -66,6 +66,7 @@ class P2PFileBaseKwargs: bk_cloud_id: int # 对应的云区域ID file_list: list # 需要传送的源文件列表 source_ip_list: list # 源文件的机器IP列表 + run_as_system_user: str = None # 表示执行job的api的操作用户, None 默认是用root用户 file_type: Optional[MediumFileTypeEnum] = MediumFileTypeEnum.Server.value file_target_path: str = None # 表示下载到目标机器的路径,如果传None,默认则传/data/install @@ -106,6 +107,7 @@ class DownloadMediaBaseKwargs: bk_cloud_id: int # 对应的云区域ID file_list: list # 需要传送的源文件列表 + run_as_system_user: str = None # 表示执行job的api的操作用户, None 默认是用root用户 file_type: Optional[MediumFileTypeEnum] = MediumFileTypeEnum.Repo.value file_target_path: str = None # 表示下载到目标机器的路径,如果传None,默认则传/data/install diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 6887fee24a..3140f55df1 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -11,7 +11,6 @@ import copy import logging import os -import re from typing import Any, List from django.conf import settings @@ -21,7 +20,6 @@ from backend.components import DBConfigApi from backend.components.dbconfig.constants import FormatType, LevelName, ReqType from backend.configuration.models import SystemSettings -from backend.constants import IP_RE_PATTERN from backend.core import consts from backend.core.consts import BK_PKG_INSTALL_PATH from backend.core.encrypt.constants import RSAConfigType @@ -30,8 +28,7 @@ from backend.db_meta.exceptions import DBMetaException from backend.db_meta.models import Cluster, Machine, ProxyInstance, StorageInstance, StorageInstanceTuple from backend.db_package.models import Package -from backend.db_proxy.constants import ExtensionType -from backend.db_proxy.models import DBCloudProxy, DBExtension +from backend.db_proxy.models import DBCloudProxy from backend.db_services.mysql.sql_import.constants import BKREPO_SQLFILE_PATH from backend.flow.consts import ( CHECKSUM_DB, @@ -48,6 +45,8 @@ RollbackType, ) from backend.flow.engine.bamboo.scene.common.get_real_version import get_mysql_real_version, get_spider_real_version +from backend.flow.utils.base.payload_handler import PayloadHandler +from backend.flow.utils.tbinlogdumper.tbinlogdumper_act_payload import TBinlogDumperActPayload from backend.ticket.constants import TicketType apply_list = [TicketType.MYSQL_SINGLE_APPLY.value, TicketType.MYSQL_HA_APPLY.value] @@ -55,42 +54,13 @@ logger = logging.getLogger("flow") -class MysqlActPayload(object): +class MysqlActPayload(PayloadHandler, TBinlogDumperActPayload): """ 定义mysql不同执行类型,拼接不同的payload参数,对应不同的dict结构体。 + todo 后续要优化这块代码,因为类太大,建议按照场景拆分,然后继承,例如TBinlogDumperActPayload继承TBinlogDumper相关的方法 + todo 比如spider场景拆出来、公共部分的拆出来等 """ - def __init__(self, bk_cloud_id: int, ticket_data: dict, cluster: dict, cluster_type: str = None): - """ - @param bk_cloud_id 操作的云区域 - @param ticket_data 单据信息 - @param cluster 需要操作的集群信息 - @param cluster_type 表示操作的集群类型,会决定到db_config获取配置的空间 - """ - self.init_mysql_config = {} - self.bk_cloud_id = bk_cloud_id - self.ticket_data = ticket_data - self.cluster = cluster - self.cluster_type = cluster_type - self.mysql_pkg = None - self.proxy_pkg = None - self.checksum_pkg = None - self.mysql_crond_pkg = None - self.mysql_monitor_pkg = None - self.account = self.__get_mysql_account() - - # self.db_module_id = ( - # self.ticket_data["module"] if self.ticket_data.get("module") else self.cluster.get("db_module_id") - # ) - # 尝试获取db_module_id , 有些单据不需要db_module_id,则最终给0 - # todo 后面可能优化这个问题 - if self.ticket_data.get("module"): - self.db_module_id = self.ticket_data["module"] - elif self.cluster and self.cluster.get("db_module_id"): - self.db_module_id = self.cluster["db_module_id"] - else: - self.db_module_id = 0 - @staticmethod def __get_mysql_account() -> Any: """ @@ -109,56 +79,6 @@ def __get_mysql_account() -> Any: ) return data["content"] - def __get_super_account_bypass(self): - """ - 旁路逻辑:获取环境变量中的access_hosts, 用户名和密码 - """ - access_hosts = env.TEST_ACCESS_HOSTS or re.compile(IP_RE_PATTERN).findall(env.DRS_APIGW_DOMAIN) - drs_account_data = { - "access_hosts": access_hosts, - "user": env.DRS_USERNAME, - "pwd": env.DRS_PASSWORD, - } - - access_hosts = env.TEST_ACCESS_HOSTS or re.compile(IP_RE_PATTERN).findall(env.DBHA_APIGW_DOMAIN_LIST) - dbha_account_data = { - "access_hosts": access_hosts, - "user": env.DBHA_USERNAME, - "pwd": env.DBHA_PASSWORD, - } - - return drs_account_data, dbha_account_data - - def __get_super_account(self): - """ - 获取mysql机器系统管理账号信息 - """ - - if env.DRS_USERNAME and env.DBHA_USERNAME: - return self.__get_super_account_bypass() - - rsa = RSAHandler.get_or_generate_rsa_in_db(RSAConfigType.get_rsa_cloud_name(self.bk_cloud_id)) - - drs = DBExtension.get_latest_extension(bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DRS) - drs_account_data = { - "access_hosts": DBExtension.get_extension_access_hosts( - bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DRS - ), - "pwd": RSAHandler.decrypt_password(rsa.rsa_private_key.content, drs.details["pwd"]), - "user": RSAHandler.decrypt_password(rsa.rsa_private_key.content, drs.details["user"]), - } - - dbha = DBExtension.get_latest_extension(bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DBHA) - dbha_account_data = { - "access_hosts": DBExtension.get_extension_access_hosts( - bk_cloud_id=self.bk_cloud_id, extension_type=ExtensionType.DBHA - ), - "pwd": RSAHandler.decrypt_password(rsa.rsa_private_key.content, dbha.details["pwd"]), - "user": RSAHandler.decrypt_password(rsa.rsa_private_key.content, dbha.details["user"]), - } - - return drs_account_data, dbha_account_data - @staticmethod def __get_proxy_account() -> Any: """ @@ -212,21 +132,6 @@ def __get_proxy_config(self) -> Any: ) return data["content"] - @staticmethod - def __get_tbinlogdumper_config(bk_biz_id: int, module_id: int): - data = DBConfigApi.query_conf_item( - { - "bk_biz_id": str(bk_biz_id), - "level_name": LevelName.MODULE, - "level_value": str(module_id), - "conf_file": "latest", - "conf_type": "tbinlogdumper", - "namespace": ClusterType.TenDBHA, - "format": FormatType.MAP_LEVEL, - } - ) - return data["content"] - def __get_version_and_charset(self, db_module_id) -> Any: """获取版本号和字符集信息""" data = DBConfigApi.query_conf_item( @@ -326,7 +231,7 @@ def get_install_mysql_payload(self, **kwargs) -> dict: for port in install_mysql_ports: mysql_config[port] = copy.deepcopy(self.init_mysql_config[port]) - drs_account, dbha_account = self.__get_super_account() + drs_account, dbha_account = self.get_super_account() return { "db_type": DBActuatorTypeEnum.MySQL.value, @@ -372,7 +277,7 @@ def get_install_spider_payload(self, **kwargs): ) spider_auto_incr_mode_map[port] = self.cluster["auto_incr_value"] - drs_account, dbha_account = self.__get_super_account() + drs_account, dbha_account = self.get_super_account() return { "db_type": DBActuatorTypeEnum.Spider.value, @@ -415,7 +320,7 @@ def get_install_spider_ctl_payload(self, **kwargs): ctl_pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type=MediumEnum.tdbCtl) version_no = get_mysql_real_version(ctl_pkg.name) - drs_account, dbha_account = self.__get_super_account() + drs_account, dbha_account = self.get_super_account() return { "db_type": DBActuatorTypeEnum.SpiderCtl.value, "action": DBActuatorActionEnum.Deploy.value, @@ -1988,86 +1893,6 @@ def tendb_recover_binlog_payload(self, **kwargs): } return payload - def install_tbinlogdumper_payload(self, **kwargs): - """ - 安装tbinlogdumper实例,字符集是通过master实例获取 - """ - - pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type=MediumEnum.TBinlogDumper) - version_no = get_mysql_real_version(pkg.name) - - # 计算这次安装tbinlogdumper实例端口,并且计算每个端口安装配置 - mycnf_configs = {} - dumper_configs = {} - - for conf in self.ticket_data["add_conf_list"]: - mycnf_configs[conf["port"]] = self.__get_tbinlogdumper_config( - bk_biz_id=self.ticket_data["bk_biz_id"], module_id=conf["module_id"] - ) - dumper_configs[conf["port"]] = {"dumper_id": conf["area_name"], "area_name": conf["area_name"]} - - drs_account, dbha_account = self.__get_super_account() - return { - "db_type": DBActuatorTypeEnum.TBinlogDumper.value, - "action": DBActuatorActionEnum.Deploy.value, - "payload": { - "general": {"runtime_account": self.account}, - "extend": { - "host": kwargs["ip"], - "pkg": pkg.name, - "pkg_md5": pkg.md5, - "mysql_version": version_no, - "charset": self.ticket_data["charset"], - "inst_mem": 0, - "ports": [conf["port"] for conf in self.ticket_data["add_conf_list"]], - "super_account": drs_account, - "dbha_account": dbha_account, - "mycnf_configs": mycnf_configs, - "dumper_configs": dumper_configs, - }, - }, - } - - def uninstall_tbinlogdumper_payload(self, **kwargs) -> dict: - """ - 卸载tbinlogdumper进程的payload参数 - """ - return { - "db_type": DBActuatorTypeEnum.TBinlogDumper.value, - "action": DBActuatorActionEnum.UnInstall.value, - "payload": { - "general": {"runtime_account": self.account}, - "extend": { - "host": kwargs["ip"], - "force": True, - "ports": self.cluster["listen_ports"], - }, - }, - } - - def tbinlogdumper_sync_data_payload(self, **kwargs): - """ - TBinlogDumper建立数据同步 - """ - return { - "db_type": DBActuatorTypeEnum.MySQL.value, - "action": DBActuatorActionEnum.ChangeMaster.value, - "payload": { - "general": {"runtime_account": self.account}, - "extend": { - "host": kwargs["ip"], - "port": self.cluster["listen_port"], - "master_host": self.cluster["master_ip"], - "master_port": self.cluster["master_port"], - "is_gtid": False, - "max_tolerate_delay": 0, - "force": False, - "bin_file": self.cluster["bin_file"], - "bin_position": self.cluster["bin_position"], - }, - }, - } - def get_install_tmp_db_backup_payload(self, **kwargs): """ 数据恢复时安装临时备份程序。大部分信息可忽略不计 @@ -2075,7 +1900,8 @@ def get_install_tmp_db_backup_payload(self, **kwargs): db_backup_pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type=MediumEnum.DbBackup) cfg = self.__get_dbbackup_config() # cluster = Cluster.objects.get(id=self.cluster["cluster_id"]) - # ins_list = StorageInstance.objects.filter(machine__ip=kwargs["ip"], machine__bk_cloud_id=self.cluster["bk_cloud_id"]) + # ins_list = + # StorageInstance.objects.filter(machine__ip=kwargs["ip"], machine__bk_cloud_id=self.cluster["bk_cloud_id"]) return { "db_type": DBActuatorTypeEnum.MySQL.value, "action": DBActuatorActionEnum.DeployDbbackup.value, diff --git a/dbm-ui/backend/flow/utils/tbinlogdumper/context_dataclass.py b/dbm-ui/backend/flow/utils/tbinlogdumper/context_dataclass.py index 9f421ef96e..e83a73bbd5 100644 --- a/dbm-ui/backend/flow/utils/tbinlogdumper/context_dataclass.py +++ b/dbm-ui/backend/flow/utils/tbinlogdumper/context_dataclass.py @@ -17,6 +17,7 @@ class TBinlogDumperAddContext: """ master_ip_sync_info: dict = field(default_factory=dict) # 代表获取到master的主从复制位点信息 + backup_info: dict = field(default_factory=dict) # 代表做全量同步时备份信息 @staticmethod def get_sync_info_var_name() -> str: @@ -33,3 +34,16 @@ class StopSlaveKwargs: tbinlogdumper_ip: str tbinlogdumper_port: int is_safe: bool + + +@dataclass +class TBinlogDumperFullSyncDataKwargs: + """ + 定义为tbinlogdumper实例同步的私有变量 + """ + + bk_cloud_id: int + backup_ip: str + backup_port: int + backup_role: str + module_id: int diff --git a/dbm-ui/backend/flow/utils/tbinlogdumper/tbinlogdumper_act_payload.py b/dbm-ui/backend/flow/utils/tbinlogdumper/tbinlogdumper_act_payload.py new file mode 100644 index 0000000000..830991f2c0 --- /dev/null +++ b/dbm-ui/backend/flow/utils/tbinlogdumper/tbinlogdumper_act_payload.py @@ -0,0 +1,196 @@ +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging + +from backend.components import DBConfigApi +from backend.components.dbconfig.constants import FormatType, LevelName +from backend.db_meta.enums import ClusterType, InstanceRole +from backend.db_meta.models import Cluster +from backend.db_package.models import Package +from backend.flow.consts import DBActuatorActionEnum, DBActuatorTypeEnum, MediumEnum +from backend.flow.engine.bamboo.scene.common.get_real_version import get_mysql_real_version + +logger = logging.getLogger("flow") + + +class TBinlogDumperActPayload(object): + @staticmethod + def get_tbinlogdumper_config(bk_biz_id: int, module_id: int): + data = DBConfigApi.query_conf_item( + { + "bk_biz_id": str(bk_biz_id), + "level_name": LevelName.MODULE, + "level_value": str(module_id), + "conf_file": "latest", + "conf_type": "tbinlogdumper", + "namespace": ClusterType.TenDBHA, + "format": FormatType.MAP_LEVEL, + } + ) + return data["content"] + + def install_tbinlogdumper_payload(self, **kwargs): + """ + 安装tbinlogdumper实例,字符集是通过master实例获取 + """ + + pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type=MediumEnum.TBinlogDumper) + version_no = get_mysql_real_version(pkg.name) + + # 计算这次安装tbinlogdumper实例端口,并且计算每个端口安装配置 + mycnf_configs = {} + dumper_configs = {} + + for conf in self.ticket_data["add_conf_list"]: + mycnf_configs[conf["port"]] = self.get_tbinlogdumper_config( + bk_biz_id=self.ticket_data["bk_biz_id"], module_id=conf["module_id"] + ) + dumper_configs[conf["port"]] = {"dumper_id": conf["area_name"], "area_name": conf["area_name"]} + + drs_account, dbha_account = self.get_super_account() + return { + "db_type": DBActuatorTypeEnum.TBinlogDumper.value, + "action": DBActuatorActionEnum.Deploy.value, + "payload": { + "general": {"runtime_account": self.account}, + "extend": { + "host": kwargs["ip"], + "pkg": pkg.name, + "pkg_md5": pkg.md5, + "mysql_version": version_no, + "charset": self.ticket_data["charset"], + "inst_mem": 0, + "ports": [conf["port"] for conf in self.ticket_data["add_conf_list"]], + "super_account": drs_account, + "dbha_account": dbha_account, + "mycnf_configs": mycnf_configs, + "dumper_configs": dumper_configs, + }, + }, + } + + def uninstall_tbinlogdumper_payload(self, **kwargs) -> dict: + """ + 卸载tbinlogdumper进程的payload参数 + """ + return { + "db_type": DBActuatorTypeEnum.TBinlogDumper.value, + "action": DBActuatorActionEnum.UnInstall.value, + "payload": { + "general": {"runtime_account": self.account}, + "extend": { + "host": kwargs["ip"], + "force": True, + "ports": self.cluster["listen_ports"], + }, + }, + } + + def tbinlogdumper_sync_data_payload(self, **kwargs): + """ + TBinlogDumper建立数据同步 + """ + return { + "db_type": DBActuatorTypeEnum.MySQL.value, + "action": DBActuatorActionEnum.ChangeMaster.value, + "payload": { + "general": {"runtime_account": self.account}, + "extend": { + "host": kwargs["ip"], + "port": self.cluster["listen_port"], + "master_host": self.cluster["master_ip"], + "master_port": self.cluster["master_port"], + "is_gtid": False, + "max_tolerate_delay": 0, + "force": False, + "bin_file": self.cluster["bin_file"], + "bin_position": self.cluster["bin_position"], + }, + }, + } + + def tbinlogdumper_backup_demand_payload(self, **kwargs): + return { + "db_type": DBActuatorTypeEnum.TBinlogDumper.value, + "action": DBActuatorActionEnum.MySQLBackupDemand.value, + "payload": { + "general": {"runtime_account": self.account}, + "extend": { + "host": kwargs["ip"], + "port": self.ticket_data["port"], + "role": self.ticket_data["role"], + "backup_type": "logical", + "backup_gsd": ["data"], + "regex": self.ticket_data["db_table_filter_regex"], + "backup_id": self.ticket_data["backup_id"].__str__(), + "bill_id": str(self.ticket_data["uid"]), + "custom_backup_dir": "tbinlogdumper", + }, + }, + } + + def tbinlogdumper_restore_payload(self, **kwargs): + """ + tbinlogdumper 恢复备份数据(不包括表结构) + """ + cluster = Cluster.objects.get(id=self.ticket_data["cluster_id"]) + master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) + index_file = "" + for result in kwargs["trans_data"]["backup_info"]["report_result"]: + if result["file_type"] == "index": + index_file = result["file_name"] + break + + return { + "db_type": DBActuatorTypeEnum.MySQL.value, + "action": DBActuatorActionEnum.RestoreSlave.value, + "payload": { + "general": {"runtime_account": self.account}, + "extend": { + "work_dir": kwargs["trans_data"]["backup_info"]["backup_dir"], + "backup_dir": kwargs["trans_data"]["backup_info"]["backup_dir"], + "backup_files": { + "index": [index_file], + }, + "tgt_instance": { + "host": kwargs["ip"], + "port": self.ticket_data["add_tbinlogdumper_conf"]["port"], + "user": self.account["admin_user"], + "pwd": self.account["admin_pwd"], + "socket": None, + "charset": "", + "options": "", + }, + "src_instance": {"host": master.machine.ip, "port": master.port}, + "change_master": True, + "work_id": "", + }, + }, + } + + def tbinlogdumper_load_schema_payload(self, **kwargs): + """ + TbinlogDumper导入表结构 + """ + cluster = Cluster.objects.get(id=self.ticket_data["cluster_id"]) + master = cluster.storageinstance_set.get(instance_role=InstanceRole.BACKEND_MASTER) + return { + "db_type": DBActuatorTypeEnum.TBinlogDumper.value, + "action": DBActuatorActionEnum.DumpSchema.value, + "payload": { + "general": {"runtime_account": self.account}, + "extend": { + "host": kwargs["ip"], + "port": master.port, + "tbinlogdumper_port": self.ticket_data["add_tbinlogdumper_conf"]["port"], + "charset": "default", + }, + }, + }